diff --git a/project.clj b/project.clj index 771ae0b..bc72813 100644 --- a/project.clj +++ b/project.clj @@ -1,8 +1,8 @@ -(defproject net.cgrand/xforms "0.19.2" +(defproject net.cgrand/xforms "0.19.3-SNAPSHOT" :description "Extra transducers for Clojure" :url "https://github.com/cgrand/xforms" :license {:name "Eclipse Public License" :url "http://www.eclipse.org/legal/epl-v10.html"} :dependencies [[org.clojure/clojure "1.8.0" :scope "provided"] [org.clojure/clojurescript "1.9.293" :scope "provided"] - [net.cgrand/macrovich "0.2.0"]]) \ No newline at end of file + [net.cgrand/macrovich "0.2.0"]]) diff --git a/src/net/cgrand/xforms/io.clj b/src/net/cgrand/xforms/io.clj index ef10042..1b871c8 100644 --- a/src/net/cgrand/xforms/io.clj +++ b/src/net/cgrand/xforms/io.clj @@ -1,7 +1,12 @@ (ns net.cgrand.xforms.io (:require [clojure.java.io :as io] - [clojure.java.shell :as sh] - [clojure.edn :as edn])) + [clojure.java.shell :as sh] + [clojure.edn :as edn]) + (:import (java.io Reader BufferedReader IOException InputStream OutputStream BufferedWriter Writer PushbackReader InputStreamReader OutputStreamWriter Closeable) + (java.util Arrays List) + (java.util.concurrent ArrayBlockingQueue) + (java.lang ProcessBuilder$Redirect) + (clojure.lang IFn Fn IReduce))) (defn keep-opts [m like] (let [ns (namespace like)] @@ -17,11 +22,11 @@ Input is automatically closed upon completion or error." [in & opts] (let [no-init (Object.)] - (reify clojure.lang.IReduce + (reify IReduce (reduce [self f] (.reduce self f no-init)) (reduce [self f init] - (with-open [rdr (apply io/reader in opts)] - (let [rdr (cond-> rdr (not (instance? java.io.BufferedReader rdr)) java.io.BufferedReader.) + (with-open [^Reader rdr (apply io/reader in opts)] + (let [^BufferedReader rdr (cond-> rdr (not (instance? BufferedReader rdr)) (BufferedReader.)) init (if (identical? init no-init) (or (.readLine rdr) (f)) init)] @@ -34,17 +39,17 @@ state)))))))) (defn lines-out - "1-2 args: reducing function that writes values serialized to its accumulator (a java.io.Writer). + "1-2 args: reducing function that writes values serialized to its accumulator (a java.io.BufferedWriter). 3+ args: transducing context that writes transformed values to the specified output. The output is - coerced to a Writer by passing out and opts to clojure.java.io/writer. The output is automatically closed. + coerced to a BufferedWriter by passing out and opts to clojure.java.io/writer. The output is automatically closed. Returns the writer." ([w] w) - ([^java.io.Writer w line] + ([^BufferedWriter w line] (doto w (.write (str line)) (.newLine))) ([out xform coll & opts] - (with-open [w (apply io/writer out opts)] + (with-open [^Writer w (apply io/writer out opts)] (transduce xform lines-out w coll)))) (defn edn-in @@ -56,11 +61,11 @@ edn/read" [in & {:as opts}] (let [no-init (Object.)] - (reify clojure.lang.IReduce + (reify IReduce (reduce [self f] (.reduce self f no-init)) (reduce [self f init] - (with-open [rdr (apply io/reader in (mapcat seq (keep-opts opts ::io/opts)))] - (let [rdr (cond-> rdr (not (instance? java.io.PushbackReader rdr)) java.io.PushbackReader.) + (with-open [^Reader rdr (apply io/reader in (mapcat seq (keep-opts opts ::io/opts)))] + (let [^BufferedReader rdr (cond-> rdr (not (instance? PushbackReader rdr)) PushbackReader.) opts (assoc (keep-opts opts ::edn/opts) :eof no-init) init (if (identical? init no-init) (let [form (edn/read opts rdr)] @@ -83,7 +88,7 @@ coerced to a Writer by passing out and opts to clojure.java.io/writer. The output is automatically closed. Returns the writer." ([w] w) - ([^java.io.Writer w x] + ([^Writer w x] (binding [*out* w *print-length* nil *print-level* nil @@ -93,7 +98,7 @@ (prn x) w)) ([out xform coll & opts] - (with-open [w (apply io/writer out opts)] + (with-open [^Writer w (apply io/writer out opts)] (transduce xform edn-out w coll)))) (defn- stream-spec [x] @@ -108,32 +113,32 @@ * :dir, the current dir (defaults to clojure.java.shell/*sh-dir* or JVM current dir), * :in and :out which are maps with keys :mode (:lines (default), :text or :bytes) and :enc (defaults to \"UTF-8\"); encoding applies only for modes :lines or :text; shorthands exist: a single keyword is equivalent to {:mode k :enc \"UTF-8\"}, - a single string is equivelent to {:mode :lines, :enc s}. + a single string is equivalent to {:mode :lines, :enc s}. In :bytes mode, values are bytes array. In :lines mode, values are strings representing lines without line delimiters. In :text mode, values are strings." {:arglists '([cmd arg1 ... argN & opts])} [& args] (reify - clojure.lang.IReduce + IReduce (reduce [self rf] (reduce rf (eduction self nil))) ; quick way to handle no init (reduce [self rf init] (let [xf (self rf)] (xf init))) - clojure.lang.Fn - clojure.lang.IFn + Fn + IFn (invoke [_ rf] (let [[cmd [& {:as opts :keys [env in out dir] :or {dir sh/*sh-dir*}}]] (split-with string? args) env (into (or sh/*sh-env* {}) env) env (into {} (for [[k v] env] [(name k) (str v)])) - proc (-> ^java.util.List (map str cmd) ProcessBuilder. - (.redirectError java.lang.ProcessBuilder$Redirect/INHERIT) + proc (-> ^List (map str cmd) ProcessBuilder. + (.redirectError ProcessBuilder$Redirect/INHERIT) (doto (-> .environment (.putAll env))) (.directory (io/as-file dir)) .start) EOS (Object.) - q (java.util.concurrent.ArrayBlockingQueue. 16) + q (ArrayBlockingQueue. 16) drain (fn [acc] (loop [acc acc] (if-some [x (.poll q)] @@ -146,41 +151,41 @@ acc))) in (stream-spec in) out (stream-spec out) - stdin (cond-> (.getOutputStream proc) (#{:lines :text} (:mode in)) (-> (java.io.OutputStreamWriter. (:enc in)) java.io.BufferedWriter.)) - stdout (cond-> (.getInputStream proc) (#{:lines :text} (:mode out)) (-> (java.io.InputStreamReader. (:enc out)) java.io.BufferedReader.)) + ^Closeable stdin (cond-> (.getOutputStream proc) (#{:lines :text} (:mode in)) (-> (OutputStreamWriter. ^String (:enc in)) BufferedWriter.)) + stdout (cond-> (.getInputStream proc) (#{:lines :text} (:mode out)) (-> (InputStreamReader. ^String (:enc out)) BufferedReader.)) write! (case (:mode in) :lines (fn [x] - (doto ^java.io.BufferedWriter stdin + (doto ^BufferedWriter stdin (.write (str x)) .newLine)) :text (fn [x] - (.write ^java.io.BufferedWriter stdin (str x))) + (.write ^BufferedWriter stdin (str x))) :bytes (fn [^bytes x] - (.write ^java.io.OutputStream stdin x)))] + (.write ^OutputStream stdin x)))] (-> (case (:mode out) :lines #(loop [] - (if-some [s (.readLine ^java.io.BufferedReader stdout)] + (if-some [s (.readLine ^BufferedReader stdout)] (do (.put q s) (recur)) (.put q EOS))) :text #(let [buf (char-array 1024)] (loop [] - (let [n (.read ^java.io.BufferedReader stdout buf)] + (let [n (.read ^BufferedReader stdout buf)] (if (neg? n) (.put q EOS) (do (.put q (String. buf 0 n)) (recur)))))) :bytes #(let [buf (byte-array 1024)] (loop [] - (let [n (.read ^java.io.InputStream stdout buf)] + (let [n (.read ^InputStream stdout buf)] (if (neg? n) (.put q EOS) - (do (.put q (java.util.Arrays/copyOf buf n)) (recur))))))) + (do (.put q (Arrays/copyOf buf n)) (recur))))))) Thread. .start) (fn ([] (rf)) @@ -197,5 +202,5 @@ (when-not (reduced? acc) (write! x)) acc - (catch java.io.IOException e + (catch IOException e (ensure-reduced acc)))))))))) \ No newline at end of file