From 3edc5879ca65a97a52856b240c695378ee48ff5a Mon Sep 17 00:00:00 2001 From: Michiel Borkent Date: Sat, 9 May 2020 12:12:42 +0200 Subject: [PATCH] init --- .gitignore | 1 + deps.edn | 4 + src/babashka/pods/impl.clj | 153 +++++++++++++++++++++++++++++++++++++ src/babashka/pods/jvm.clj | 13 ++++ src/babashka/pods/sci.clj | 15 ++++ 5 files changed, 186 insertions(+) create mode 100644 .gitignore create mode 100644 deps.edn create mode 100644 src/babashka/pods/impl.clj create mode 100644 src/babashka/pods/jvm.clj create mode 100644 src/babashka/pods/sci.clj diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..304db9f --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.cpcache diff --git a/deps.edn b/deps.edn new file mode 100644 index 0000000..ca1e9f6 --- /dev/null +++ b/deps.edn @@ -0,0 +1,4 @@ +{:deps {borkdude/sci {:mvn/version "0.0.13-alpha.19"} + nrepl/bencode {:mvn/version "1.1.0"} + cheshire {:mvn/version "5.10.0"} + org.clojure/core.async {:mvn/version "1.1.587"}}} diff --git a/src/babashka/pods/impl.clj b/src/babashka/pods/impl.clj new file mode 100644 index 0000000..058d6d4 --- /dev/null +++ b/src/babashka/pods/impl.clj @@ -0,0 +1,153 @@ +(ns babashka.pods.impl + {:no-doc true} + (:refer-clojure :exclude [read]) + (:require [bencode.core :as bencode] + [cheshire.core :as cheshire] + [clojure.core.async :as async] + [clojure.edn :as edn] + [sci.core :as sci])) + +(set! *warn-on-reflection* true) + +(defn add-shutdown-hook! [^Runnable f] + (-> (Runtime/getRuntime) + (.addShutdownHook (Thread. f)))) + +(defn write [^java.io.OutputStream stream v] + (locking stream + (bencode/write-bencode stream v) + (.flush stream))) + +(defn read [stream] + (bencode/read-bencode stream)) + +(defn bytes->string [^"[B" bytes] + (String. bytes)) + +(defn get-string [m k] + (-> (get m k) + bytes->string)) + +(defn processor [pod] + (let [stdout (:stdout pod) + format (:format pod) + chans (:chans pod) + read-fn (case format + :edn edn/read-string + :json #(cheshire/parse-string-strict % true))] + (try + (loop [] + (let [reply (read stdout) + id (get reply "id") + id (bytes->string id) + value* (find reply "value") + value (some-> value* + second + bytes->string + read-fn) + status (get reply "status") + status (set (map (comp keyword bytes->string) status)) + done? (contains? status :done) + error? (contains? status :error) + value (if error? + (let [message (or (some-> (get reply "ex-message") + bytes->string) + "") + data (or (some-> (get reply "ex-data") + bytes->string + read-fn) + {})] + (ex-info message data)) + value) + chan (get @chans id) + out (some-> (get reply "out") + bytes->string) + err (some-> (get reply "err") + bytes->string)] + (when (or value* error?) (async/put! chan value)) + (when (or done? error?) (async/close! chan)) + (when out (binding [*out* @sci/out] + (println out))) + (when err (binding [*out* @sci/err] + (println err)))) + (recur)) + (catch Exception e + (binding [*out* @sci/err] + (prn e)))))) + +(defn next-id [] + (str (java.util.UUID/randomUUID))) + +(defn invoke [pod pod-var args async?] + (let [stream (:stdin pod) + format (:format pod) + chans (:chans pod) + write-fn (case format + :edn pr-str + :json cheshire/generate-string) + id (next-id) + chan (async/chan) + _ (swap! chans assoc id chan) + _ (write stream {"id" id + "op" "invoke" + "var" (str pod-var) + "args" (write-fn args)})] + (if async? chan ;; TODO: https://blog.jakubholy.net/2019/core-async-error-handling/ + (let [v (async/ (get reply "format") bytes->string keyword) + ops (some->> (get reply "ops") keys (map keyword) set) + pod {:process p + :pod-spec pod-spec + :stdin stdin + :stdout stdout + :chans (atom {}) + :format format + :ops ops} + _ (add-shutdown-hook! + (fn [] + (if (contains? ops :shutdown) + (do (write stdin {"op" "shutdown" + "id" (next-id)}) + (.waitFor p)) + (.destroy p)))) + pod-namespaces (get reply "namespaces") + vars-fn (fn [ns-name-str vars] + (reduce + (fn [m var] + (let [name (get-string var "name") + async? (some-> (get var "async") + bytes->string + #(Boolean/parseBoolean %)) + name-sym (symbol name) + sym (symbol ns-name-str name)] + (assoc m name-sym (fn [& args] + (let [res (invoke pod sym args async?)] + res))))) + {} + vars)) + pod-namespaces (reduce (fn [namespaces namespace] + (let [name-str (-> namespace (get "name") bytes->string) + name-sym (symbol name-str) + vars (get namespace "vars") + vars (vars-fn name-str vars)] + (assoc namespaces name-sym vars))) + {} + pod-namespaces)] + (assoc pod :namespaces pod-namespaces)))) diff --git a/src/babashka/pods/jvm.clj b/src/babashka/pods/jvm.clj new file mode 100644 index 0000000..c6ca85d --- /dev/null +++ b/src/babashka/pods/jvm.clj @@ -0,0 +1,13 @@ +(ns babashka.pods.jvm + (:require [babashka.pods.impl :as impl])) + +(defn load-pod + ([pod-spec] (load-pod pod-spec nil)) + ([pod-spec _opts] + (let [pod (impl/load-pod pod-spec _opts) + namespaces (:namespaces pod)] + (doseq [[ns-sym v] namespaces] + (create-ns ns-sym) + (doseq [[var-sym v] v] + (intern ns-sym var-sym v))) + (future (impl/processor pod))))) diff --git a/src/babashka/pods/sci.clj b/src/babashka/pods/sci.clj new file mode 100644 index 0000000..ea12fb7 --- /dev/null +++ b/src/babashka/pods/sci.clj @@ -0,0 +1,15 @@ +(ns babashka.pods.sci + (:require [babashka.pods.impl :as impl] + [sci.core :as sci])) + +(defn load-pod + ([ctx pod-spec] (load-pod ctx pod-spec nil)) + ([ctx pod-spec _opts] + (let [pod (impl/load-pod pod-spec _opts) + namespaces (:namespaces pod) + env (:env ctx)] + (swap! env + (fn [env] + update env :namespaces merge namespaces)) + (sci/future (impl/processor pod))))) +