Merge remote-tracking branch 'guv/reflection-in-io' into fixes-2022
This commit is contained in:
commit
5872fa5147
1 changed files with 36 additions and 31 deletions
|
|
@ -1,7 +1,12 @@
|
|||
(ns net.cgrand.xforms.io
|
||||
(:require [clojure.java.io :as io]
|
||||
[clojure.java.shell :as sh]
|
||||
[clojure.edn :as edn]))
|
||||
[clojure.java.shell :as sh]
|
||||
[clojure.edn :as edn])
|
||||
(:import (java.io Reader BufferedReader IOException InputStream OutputStream BufferedWriter Writer PushbackReader InputStreamReader OutputStreamWriter Closeable)
|
||||
(java.util Arrays List)
|
||||
(java.util.concurrent ArrayBlockingQueue)
|
||||
(java.lang ProcessBuilder$Redirect)
|
||||
(clojure.lang IFn Fn IReduce)))
|
||||
|
||||
(defn keep-opts [m like]
|
||||
(let [ns (namespace like)]
|
||||
|
|
@ -17,11 +22,11 @@
|
|||
Input is automatically closed upon completion or error."
|
||||
[in & opts]
|
||||
(let [no-init (Object.)]
|
||||
(reify clojure.lang.IReduce
|
||||
(reify IReduce
|
||||
(reduce [self f] (.reduce self f no-init))
|
||||
(reduce [self f init]
|
||||
(with-open [rdr (apply io/reader in opts)]
|
||||
(let [rdr (cond-> rdr (not (instance? java.io.BufferedReader rdr)) java.io.BufferedReader.)
|
||||
(with-open [^Reader rdr (apply io/reader in opts)]
|
||||
(let [^BufferedReader rdr (cond-> rdr (not (instance? BufferedReader rdr)) (BufferedReader.))
|
||||
init (if (identical? init no-init)
|
||||
(or (.readLine rdr) (f))
|
||||
init)]
|
||||
|
|
@ -34,17 +39,17 @@
|
|||
state))))))))
|
||||
|
||||
(defn lines-out
|
||||
"1-2 args: reducing function that writes values serialized to its accumulator (a java.io.Writer).
|
||||
"1-2 args: reducing function that writes values serialized to its accumulator (a java.io.BufferedWriter).
|
||||
3+ args: transducing context that writes transformed values to the specified output. The output is
|
||||
coerced to a Writer by passing out and opts to clojure.java.io/writer. The output is automatically closed.
|
||||
coerced to a BufferedWriter by passing out and opts to clojure.java.io/writer. The output is automatically closed.
|
||||
Returns the writer."
|
||||
([w] w)
|
||||
([^java.io.Writer w line]
|
||||
([^BufferedWriter w line]
|
||||
(doto w
|
||||
(.write (str line))
|
||||
(.newLine)))
|
||||
([out xform coll & opts]
|
||||
(with-open [w (apply io/writer out opts)]
|
||||
(with-open [^Writer w (apply io/writer out opts)]
|
||||
(transduce xform lines-out w coll))))
|
||||
|
||||
(defn edn-in
|
||||
|
|
@ -56,11 +61,11 @@
|
|||
edn/read"
|
||||
[in & {:as opts}]
|
||||
(let [no-init (Object.)]
|
||||
(reify clojure.lang.IReduce
|
||||
(reify IReduce
|
||||
(reduce [self f] (.reduce self f no-init))
|
||||
(reduce [self f init]
|
||||
(with-open [rdr (apply io/reader in (mapcat seq (keep-opts opts ::io/opts)))]
|
||||
(let [rdr (cond-> rdr (not (instance? java.io.PushbackReader rdr)) java.io.PushbackReader.)
|
||||
(with-open [^Reader rdr (apply io/reader in (mapcat seq (keep-opts opts ::io/opts)))]
|
||||
(let [^BufferedReader rdr (cond-> rdr (not (instance? PushbackReader rdr)) PushbackReader.)
|
||||
opts (assoc (keep-opts opts ::edn/opts) :eof no-init)
|
||||
init (if (identical? init no-init)
|
||||
(let [form (edn/read opts rdr)]
|
||||
|
|
@ -83,7 +88,7 @@
|
|||
coerced to a Writer by passing out and opts to clojure.java.io/writer. The output is automatically closed.
|
||||
Returns the writer."
|
||||
([w] w)
|
||||
([^java.io.Writer w x]
|
||||
([^Writer w x]
|
||||
(binding [*out* w
|
||||
*print-length* nil
|
||||
*print-level* nil
|
||||
|
|
@ -93,7 +98,7 @@
|
|||
(prn x)
|
||||
w))
|
||||
([out xform coll & opts]
|
||||
(with-open [w (apply io/writer out opts)]
|
||||
(with-open [^Writer w (apply io/writer out opts)]
|
||||
(transduce xform edn-out w coll))))
|
||||
|
||||
(defn- stream-spec [x]
|
||||
|
|
@ -108,32 +113,32 @@
|
|||
* :dir, the current dir (defaults to clojure.java.shell/*sh-dir* or JVM current dir),
|
||||
* :in and :out which are maps with keys :mode (:lines (default), :text or :bytes) and :enc (defaults to \"UTF-8\");
|
||||
encoding applies only for modes :lines or :text; shorthands exist: a single keyword is equivalent to {:mode k :enc \"UTF-8\"},
|
||||
a single string is equivelent to {:mode :lines, :enc s}.
|
||||
a single string is equivalent to {:mode :lines, :enc s}.
|
||||
In :bytes mode, values are bytes array.
|
||||
In :lines mode, values are strings representing lines without line delimiters.
|
||||
In :text mode, values are strings."
|
||||
{:arglists '([cmd arg1 ... argN & opts])}
|
||||
[& args]
|
||||
(reify
|
||||
clojure.lang.IReduce
|
||||
IReduce
|
||||
(reduce [self rf]
|
||||
(reduce rf (eduction self nil))) ; quick way to handle no init
|
||||
(reduce [self rf init]
|
||||
(let [xf (self rf)]
|
||||
(xf init)))
|
||||
clojure.lang.Fn
|
||||
clojure.lang.IFn
|
||||
Fn
|
||||
IFn
|
||||
(invoke [_ rf]
|
||||
(let [[cmd [& {:as opts :keys [env in out dir] :or {dir sh/*sh-dir*}}]] (split-with string? args)
|
||||
env (into (or sh/*sh-env* {}) env)
|
||||
env (into {} (for [[k v] env] [(name k) (str v)]))
|
||||
proc (-> ^java.util.List (map str cmd) ProcessBuilder.
|
||||
(.redirectError java.lang.ProcessBuilder$Redirect/INHERIT)
|
||||
proc (-> ^List (map str cmd) ProcessBuilder.
|
||||
(.redirectError ProcessBuilder$Redirect/INHERIT)
|
||||
(doto (-> .environment (.putAll env)))
|
||||
(.directory (io/as-file dir))
|
||||
.start)
|
||||
EOS (Object.)
|
||||
q (java.util.concurrent.ArrayBlockingQueue. 16)
|
||||
q (ArrayBlockingQueue. 16)
|
||||
drain (fn [acc]
|
||||
(loop [acc acc]
|
||||
(if-some [x (.poll q)]
|
||||
|
|
@ -146,41 +151,41 @@
|
|||
acc)))
|
||||
in (stream-spec in)
|
||||
out (stream-spec out)
|
||||
stdin (cond-> (.getOutputStream proc) (#{:lines :text} (:mode in)) (-> (java.io.OutputStreamWriter. (:enc in)) java.io.BufferedWriter.))
|
||||
stdout (cond-> (.getInputStream proc) (#{:lines :text} (:mode out)) (-> (java.io.InputStreamReader. (:enc out)) java.io.BufferedReader.))
|
||||
^Closeable stdin (cond-> (.getOutputStream proc) (#{:lines :text} (:mode in)) (-> (OutputStreamWriter. ^String (:enc in)) BufferedWriter.))
|
||||
stdout (cond-> (.getInputStream proc) (#{:lines :text} (:mode out)) (-> (InputStreamReader. ^String (:enc out)) BufferedReader.))
|
||||
write!
|
||||
(case (:mode in)
|
||||
:lines
|
||||
(fn [x]
|
||||
(doto ^java.io.BufferedWriter stdin
|
||||
(doto ^BufferedWriter stdin
|
||||
(.write (str x))
|
||||
.newLine))
|
||||
:text
|
||||
(fn [x]
|
||||
(.write ^java.io.BufferedWriter stdin (str x)))
|
||||
(.write ^BufferedWriter stdin (str x)))
|
||||
:bytes
|
||||
(fn [^bytes x]
|
||||
(.write ^java.io.OutputStream stdin x)))]
|
||||
(.write ^OutputStream stdin x)))]
|
||||
(-> (case (:mode out)
|
||||
:lines
|
||||
#(loop []
|
||||
(if-some [s (.readLine ^java.io.BufferedReader stdout)]
|
||||
(if-some [s (.readLine ^BufferedReader stdout)]
|
||||
(do (.put q s) (recur))
|
||||
(.put q EOS)))
|
||||
:text
|
||||
#(let [buf (char-array 1024)]
|
||||
(loop []
|
||||
(let [n (.read ^java.io.BufferedReader stdout buf)]
|
||||
(let [n (.read ^BufferedReader stdout buf)]
|
||||
(if (neg? n)
|
||||
(.put q EOS)
|
||||
(do (.put q (String. buf 0 n)) (recur))))))
|
||||
:bytes
|
||||
#(let [buf (byte-array 1024)]
|
||||
(loop []
|
||||
(let [n (.read ^java.io.InputStream stdout buf)]
|
||||
(let [n (.read ^InputStream stdout buf)]
|
||||
(if (neg? n)
|
||||
(.put q EOS)
|
||||
(do (.put q (java.util.Arrays/copyOf buf n)) (recur)))))))
|
||||
(do (.put q (Arrays/copyOf buf n)) (recur)))))))
|
||||
Thread. .start)
|
||||
(fn
|
||||
([] (rf))
|
||||
|
|
@ -197,5 +202,5 @@
|
|||
(when-not (reduced? acc)
|
||||
(write! x))
|
||||
acc
|
||||
(catch java.io.IOException e
|
||||
(catch IOException e
|
||||
(ensure-reduced acc))))))))))
|
||||
Loading…
Reference in a new issue