Fixes reflection usage in xforms.io.

This commit is contained in:
Gunnar Völkel 2020-05-27 10:40:09 +02:00
parent 62375212a8
commit e9216a7d71
2 changed files with 38 additions and 33 deletions

View file

@ -1,8 +1,8 @@
(defproject net.cgrand/xforms "0.19.2" (defproject net.cgrand/xforms "0.19.3-SNAPSHOT"
:description "Extra transducers for Clojure" :description "Extra transducers for Clojure"
:url "https://github.com/cgrand/xforms" :url "https://github.com/cgrand/xforms"
:license {:name "Eclipse Public License" :license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"} :url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.8.0" :scope "provided"] :dependencies [[org.clojure/clojure "1.8.0" :scope "provided"]
[org.clojure/clojurescript "1.9.293" :scope "provided"] [org.clojure/clojurescript "1.9.293" :scope "provided"]
[net.cgrand/macrovich "0.2.0"]]) [net.cgrand/macrovich "0.2.0"]])

View file

@ -1,7 +1,12 @@
(ns net.cgrand.xforms.io (ns net.cgrand.xforms.io
(:require [clojure.java.io :as io] (:require [clojure.java.io :as io]
[clojure.java.shell :as sh] [clojure.java.shell :as sh]
[clojure.edn :as edn])) [clojure.edn :as edn])
(:import (java.io Reader BufferedReader IOException InputStream OutputStream BufferedWriter Writer PushbackReader InputStreamReader OutputStreamWriter Closeable)
(java.util Arrays List)
(java.util.concurrent ArrayBlockingQueue)
(java.lang ProcessBuilder$Redirect)
(clojure.lang IFn Fn IReduce)))
(defn keep-opts [m like] (defn keep-opts [m like]
(let [ns (namespace like)] (let [ns (namespace like)]
@ -17,11 +22,11 @@
Input is automatically closed upon completion or error." Input is automatically closed upon completion or error."
[in & opts] [in & opts]
(let [no-init (Object.)] (let [no-init (Object.)]
(reify clojure.lang.IReduce (reify IReduce
(reduce [self f] (.reduce self f no-init)) (reduce [self f] (.reduce self f no-init))
(reduce [self f init] (reduce [self f init]
(with-open [rdr (apply io/reader in opts)] (with-open [^Reader rdr (apply io/reader in opts)]
(let [rdr (cond-> rdr (not (instance? java.io.BufferedReader rdr)) java.io.BufferedReader.) (let [^BufferedReader rdr (cond-> rdr (not (instance? BufferedReader rdr)) (BufferedReader.))
init (if (identical? init no-init) init (if (identical? init no-init)
(or (.readLine rdr) (f)) (or (.readLine rdr) (f))
init)] init)]
@ -34,17 +39,17 @@
state)))))))) state))))))))
(defn lines-out (defn lines-out
"1-2 args: reducing function that writes values serialized to its accumulator (a java.io.Writer). "1-2 args: reducing function that writes values serialized to its accumulator (a java.io.BufferedWriter).
3+ args: transducing context that writes transformed values to the specified output. The output is 3+ args: transducing context that writes transformed values to the specified output. The output is
coerced to a Writer by passing out and opts to clojure.java.io/writer. The output is automatically closed. coerced to a BufferedWriter by passing out and opts to clojure.java.io/writer. The output is automatically closed.
Returns the writer." Returns the writer."
([w] w) ([w] w)
([^java.io.Writer w line] ([^BufferedWriter w line]
(doto w (doto w
(.write (str line)) (.write (str line))
(.newLine))) (.newLine)))
([out xform coll & opts] ([out xform coll & opts]
(with-open [w (apply io/writer out opts)] (with-open [^Writer w (apply io/writer out opts)]
(transduce xform lines-out w coll)))) (transduce xform lines-out w coll))))
(defn edn-in (defn edn-in
@ -56,11 +61,11 @@
edn/read" edn/read"
[in & {:as opts}] [in & {:as opts}]
(let [no-init (Object.)] (let [no-init (Object.)]
(reify clojure.lang.IReduce (reify IReduce
(reduce [self f] (.reduce self f no-init)) (reduce [self f] (.reduce self f no-init))
(reduce [self f init] (reduce [self f init]
(with-open [rdr (apply io/reader in (mapcat seq (keep-opts opts ::io/opts)))] (with-open [^Reader rdr (apply io/reader in (mapcat seq (keep-opts opts ::io/opts)))]
(let [rdr (cond-> rdr (not (instance? java.io.PushbackReader rdr)) java.io.PushbackReader.) (let [^BufferedReader rdr (cond-> rdr (not (instance? PushbackReader rdr)) PushbackReader.)
opts (assoc (keep-opts opts ::edn/opts) :eof no-init) opts (assoc (keep-opts opts ::edn/opts) :eof no-init)
init (if (identical? init no-init) init (if (identical? init no-init)
(let [form (edn/read opts rdr)] (let [form (edn/read opts rdr)]
@ -83,7 +88,7 @@
coerced to a Writer by passing out and opts to clojure.java.io/writer. The output is automatically closed. coerced to a Writer by passing out and opts to clojure.java.io/writer. The output is automatically closed.
Returns the writer." Returns the writer."
([w] w) ([w] w)
([^java.io.Writer w x] ([^Writer w x]
(binding [*out* w (binding [*out* w
*print-length* nil *print-length* nil
*print-level* nil *print-level* nil
@ -93,7 +98,7 @@
(prn x) (prn x)
w)) w))
([out xform coll & opts] ([out xform coll & opts]
(with-open [w (apply io/writer out opts)] (with-open [^Writer w (apply io/writer out opts)]
(transduce xform edn-out w coll)))) (transduce xform edn-out w coll))))
(defn- stream-spec [x] (defn- stream-spec [x]
@ -108,32 +113,32 @@
* :dir, the current dir (defaults to clojure.java.shell/*sh-dir* or JVM current dir), * :dir, the current dir (defaults to clojure.java.shell/*sh-dir* or JVM current dir),
* :in and :out which are maps with keys :mode (:lines (default), :text or :bytes) and :enc (defaults to \"UTF-8\"); * :in and :out which are maps with keys :mode (:lines (default), :text or :bytes) and :enc (defaults to \"UTF-8\");
encoding applies only for modes :lines or :text; shorthands exist: a single keyword is equivalent to {:mode k :enc \"UTF-8\"}, encoding applies only for modes :lines or :text; shorthands exist: a single keyword is equivalent to {:mode k :enc \"UTF-8\"},
a single string is equivelent to {:mode :lines, :enc s}. a single string is equivalent to {:mode :lines, :enc s}.
In :bytes mode, values are bytes array. In :bytes mode, values are bytes array.
In :lines mode, values are strings representing lines without line delimiters. In :lines mode, values are strings representing lines without line delimiters.
In :text mode, values are strings." In :text mode, values are strings."
{:arglists '([cmd arg1 ... argN & opts])} {:arglists '([cmd arg1 ... argN & opts])}
[& args] [& args]
(reify (reify
clojure.lang.IReduce IReduce
(reduce [self rf] (reduce [self rf]
(reduce rf (eduction self nil))) ; quick way to handle no init (reduce rf (eduction self nil))) ; quick way to handle no init
(reduce [self rf init] (reduce [self rf init]
(let [xf (self rf)] (let [xf (self rf)]
(xf init))) (xf init)))
clojure.lang.Fn Fn
clojure.lang.IFn IFn
(invoke [_ rf] (invoke [_ rf]
(let [[cmd [& {:as opts :keys [env in out dir] :or {dir sh/*sh-dir*}}]] (split-with string? args) (let [[cmd [& {:as opts :keys [env in out dir] :or {dir sh/*sh-dir*}}]] (split-with string? args)
env (into (or sh/*sh-env* {}) env) env (into (or sh/*sh-env* {}) env)
env (into {} (for [[k v] env] [(name k) (str v)])) env (into {} (for [[k v] env] [(name k) (str v)]))
proc (-> ^java.util.List (map str cmd) ProcessBuilder. proc (-> ^List (map str cmd) ProcessBuilder.
(.redirectError java.lang.ProcessBuilder$Redirect/INHERIT) (.redirectError ProcessBuilder$Redirect/INHERIT)
(doto (-> .environment (.putAll env))) (doto (-> .environment (.putAll env)))
(.directory (io/as-file dir)) (.directory (io/as-file dir))
.start) .start)
EOS (Object.) EOS (Object.)
q (java.util.concurrent.ArrayBlockingQueue. 16) q (ArrayBlockingQueue. 16)
drain (fn [acc] drain (fn [acc]
(loop [acc acc] (loop [acc acc]
(if-some [x (.poll q)] (if-some [x (.poll q)]
@ -146,41 +151,41 @@
acc))) acc)))
in (stream-spec in) in (stream-spec in)
out (stream-spec out) out (stream-spec out)
stdin (cond-> (.getOutputStream proc) (#{:lines :text} (:mode in)) (-> (java.io.OutputStreamWriter. (:enc in)) java.io.BufferedWriter.)) ^Closeable stdin (cond-> (.getOutputStream proc) (#{:lines :text} (:mode in)) (-> (OutputStreamWriter. ^String (:enc in)) BufferedWriter.))
stdout (cond-> (.getInputStream proc) (#{:lines :text} (:mode out)) (-> (java.io.InputStreamReader. (:enc out)) java.io.BufferedReader.)) stdout (cond-> (.getInputStream proc) (#{:lines :text} (:mode out)) (-> (InputStreamReader. ^String (:enc out)) BufferedReader.))
write! write!
(case (:mode in) (case (:mode in)
:lines :lines
(fn [x] (fn [x]
(doto ^java.io.BufferedWriter stdin (doto ^BufferedWriter stdin
(.write (str x)) (.write (str x))
.newLine)) .newLine))
:text :text
(fn [x] (fn [x]
(.write ^java.io.BufferedWriter stdin (str x))) (.write ^BufferedWriter stdin (str x)))
:bytes :bytes
(fn [^bytes x] (fn [^bytes x]
(.write ^java.io.OutputStream stdin x)))] (.write ^OutputStream stdin x)))]
(-> (case (:mode out) (-> (case (:mode out)
:lines :lines
#(loop [] #(loop []
(if-some [s (.readLine ^java.io.BufferedReader stdout)] (if-some [s (.readLine ^BufferedReader stdout)]
(do (.put q s) (recur)) (do (.put q s) (recur))
(.put q EOS))) (.put q EOS)))
:text :text
#(let [buf (char-array 1024)] #(let [buf (char-array 1024)]
(loop [] (loop []
(let [n (.read ^java.io.BufferedReader stdout buf)] (let [n (.read ^BufferedReader stdout buf)]
(if (neg? n) (if (neg? n)
(.put q EOS) (.put q EOS)
(do (.put q (String. buf 0 n)) (recur)))))) (do (.put q (String. buf 0 n)) (recur))))))
:bytes :bytes
#(let [buf (byte-array 1024)] #(let [buf (byte-array 1024)]
(loop [] (loop []
(let [n (.read ^java.io.InputStream stdout buf)] (let [n (.read ^InputStream stdout buf)]
(if (neg? n) (if (neg? n)
(.put q EOS) (.put q EOS)
(do (.put q (java.util.Arrays/copyOf buf n)) (recur))))))) (do (.put q (Arrays/copyOf buf n)) (recur)))))))
Thread. .start) Thread. .start)
(fn (fn
([] (rf)) ([] (rf))
@ -197,5 +202,5 @@
(when-not (reduced? acc) (when-not (reduced? acc)
(write! x)) (write! x))
acc acc
(catch java.io.IOException e (catch IOException e
(ensure-reduced acc)))))))))) (ensure-reduced acc))))))))))