From 899154c0df2b5e770d4d993fff457a5e628501ab Mon Sep 17 00:00:00 2001 From: Christophe Grand Date: Thu, 5 Oct 2017 13:23:40 +0200 Subject: [PATCH] xforms 0.11.0, with xio/sh to use any shell process as a transducer --- README.md | 12 ++++- project.clj | 2 +- src/net/cgrand/xforms/io.clj | 97 +++++++++++++++++++++++++++++++++++- 3 files changed, 107 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index a29057a..c0d78ad 100644 --- a/README.md +++ b/README.md @@ -4,19 +4,27 @@ More transducers and reducing functions for Clojure(script)! [![Build Status](https://travis-ci.org/cgrand/xforms.png?branch=master)](https://travis-ci.org/cgrand/xforms) -*Transducers* (in `net.cgrand.xforms`) can be classified in three groups: regular ones, higher-order ones +*Transducers* can be classified in three groups: regular ones, higher-order ones (which accept other transducers as arguments) and 1-item ones which emit only 1 item out no matter how many went in. 1-item transducers generally only make sense in the context of a higher-order transducer. +In `net.cgrand.xforms`: + * regular ones: `partition` (1 arg), `reductions`, `for`, `take-last`, `drop-last`, `window` and `window-by-time` * higher-order ones: `by-key`, `into-by-key`, `multiplex`, `transjuxt`, `partition` (2+ args) * 1-item ones: `reduce`, `into`, `transjuxt`, `last`, `count`, `avg`, `sd`, `min`, `minimum`, `max`, `maximum`, `str` + +In `net.cgrand.xforms.io`: + * `sh` to use any process as a transducer + *Reducing functions* * in `net.cgrand.xforms.rfs`: `min`, `minimum`, `max`, `maximum`, `str`, `str!`, `avg`, `sd`, `last` and `some`. * in `net.cgrand.xforms.io`: `line-out` and `edn-out`. +(in `net.cgrand.xforms`) + *Transducing contexts*: * in `net.cgrand.xforms`: `transjuxt` (for performing several transductions in a single pass), `into`, `count` and `some`. @@ -29,7 +37,7 @@ More transducers and reducing functions for Clojure(script)! Add this dependency to your project: ```clj -[net.cgrand /xforms "0.10.2"] +[net.cgrand /xforms "0.11.0"] ``` ```clj diff --git a/project.clj b/project.clj index 03ea792..7518399 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject net.cgrand/xforms "0.10.2" +(defproject net.cgrand/xforms "0.11.0" :description "Extra transducers for Clojure" :url "https://github.com/cgrand/xforms" :license {:name "Eclipse Public License" diff --git a/src/net/cgrand/xforms/io.clj b/src/net/cgrand/xforms/io.clj index 8d1f6dd..48f3149 100644 --- a/src/net/cgrand/xforms/io.clj +++ b/src/net/cgrand/xforms/io.clj @@ -1,5 +1,6 @@ (ns net.cgrand.xforms.io (:require [clojure.java.io :as io] + [clojure.java.shell :as sh] [clojure.edn :as edn])) (defn keep-opts [m like] @@ -93,4 +94,98 @@ w)) ([out xform coll & opts] (with-open [w (apply io/writer out opts)] - (transduce xform edn-out w coll)))) \ No newline at end of file + (transduce xform edn-out w coll)))) + +(defn- stream-spec [x] + (into {:mode :lines :enc "UTF-8"} + (cond (map? x) x (string? x) {:enc x} (keyword? x) {:mode x}))) + +(defn sh + "Transducer. Spawns a process (program cmd with optional arguments arg1 ... argN) and pipes data through it. + Options may be: + * :env, an environment variables map, it will be merged with clojure.java.shell/*sh-env* and JVM environment (in decreasing precedence order), + * :dir, the current dir (defaults to clojure.java.shell/*sh-dir* or JVM current dir), + * :in and :out which are maps with keys :mode (:lines (default), :text or :bytes) and :enc (defaults to \"UTF-8\"); + encoding applies only for modes :lines or :text; shorthands exist: a single keyword is equivalent to {:mode k :enc \"UTF-8\"}, + a single string is equivelent to {:mode :lines, :enc s}. + In :bytes mode, values are bytes array. + In :lines mode, values are strings representing lines without line delimiters. + In :text mode, values are strings." + {:arglists '([cmd arg1 ... argN & opts])} + [& args] + (fn [rf] + (let [[cmd [& {:as opts :keys [env in out dir] :or {dir sh/*sh-dir*}}]] (split-with string? args) + env (into (or sh/*sh-env* {}) env) + env (into {} (for [[k v] env] [(name k) (str v)])) + proc (-> ^java.util.List (map str cmd) ProcessBuilder. + (.redirectError java.lang.ProcessBuilder$Redirect/INHERIT) + (doto (-> .environment (.putAll env))) + (.directory (io/as-file dir)) + .start) + EOS (Object.) + q (java.util.concurrent.ArrayBlockingQueue. 16) + drain (fn [acc] + (loop [acc acc] + (if-some [x (.poll q)] + (let [acc (if (identical? EOS x) (reduced acc) (rf acc x))] + (if (reduced? acc) + (do + (.destroy proc) + acc) + (recur acc))) + acc))) + in (stream-spec in) + out (stream-spec out) + stdin (cond-> (.getOutputStream proc) (#{:lines :text} (:mode in)) (-> (java.io.OutputStreamWriter. (:enc in)) java.io.BufferedWriter.)) + stdout (cond-> (.getInputStream proc) (#{:lines :text} (:mode out)) (-> (java.io.InputStreamReader. (:enc out)) java.io.BufferedReader.)) + write! + (case (:mode in) + :lines + (fn [x] + (doto ^java.io.BufferedWriter stdin + (.write (str x)) + .newLine)) + :text + (fn [x] + (.write ^java.io.BufferedWriter stdin (str x))) + :bytes + (fn [^bytes x] + (.write ^java.io.OutputStream stdin x)))] + (-> (case (:mode out) + :lines + #(loop [] + (if-some [s (.readLine ^java.io.BufferedReader stdout)] + (do (.put q s) (recur)) + (.put q EOS))) + :text + #(let [buf (char-array 1024)] + (loop [] + (let [n (.read ^java.io.BufferedReader stdout buf)] + (if (neg? n) + (.put q EOS) + (do (.put q (String. buf 0 n)) (recur)))))) + :bytes + #(let [buf (byte-array 1024)] + (loop [] + (let [n (.read ^java.io.InputStream stdout buf)] + (if (neg? n) + (.put q EOS) + (do (.put q (java.util.Arrays/copyOf buf n)) (recur))))))) + Thread. .start) + (fn + ([] (rf)) + ([acc] + (.close stdin) + (loop [acc acc] + (let [acc (drain acc)] + (if (reduced? acc) + (rf (unreduced acc)) + (recur acc))))) + ([acc x] + (let [acc (drain acc)] + (try + (when-not (reduced? acc) + (write! x)) + acc + (catch java.io.IOException e + (ensure-reduced acc))))))))) \ No newline at end of file