[#6] async pod vars using low level invoke (#7)

This commit is contained in:
Michiel Borkent 2020-05-20 11:53:10 +02:00 committed by GitHub
parent 8f16139ab6
commit a673ab4c8a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 493 additions and 67 deletions

374
README.md
View file

@ -1,10 +1,60 @@
# babashka.pods
A library to load babashka pods. Used by babashka but also usable from the JVM
or other [sci](https://github.com/borkdude/sci)-based projects.
This is the library to load babashka pods. It is used by
[babashka](https://github.com/borkdude/babashka/) but also usable from the JVM
and [sci](https://github.com/borkdude/sci)-based projects other than babashka.
More information about babashka pods can be found
[here](https://github.com/borkdude/babashka/blob/master/doc/pods.md).
## Introduction
Pods are standalone programs that can expose namespaces with vars to babashka or
a JVM. Pods can be built in Clojure, but also in languages that don't run on the
JVM.
Some terminology:
- _pod_: a program that exposes namespaces with vars via the _pod protocol_.
- _pod client_: the program invoking a pod. When babashka invokes a pod,
babashka is the pod client. When a JVM invokes a pod, the JVM is the pod client.
- _message_: a message sent from the pod client to the pod or vice versa,
encoded in [bencode](https://en.wikipedia.org/wiki/Bencode) format.
- _payload_: a particular field of a _message_ encoded in a _payload format_
(currently only JSON or EDN). Examples are `args`, `value` and `ex-data`. _
- _pod protocol_: the documented way of exchanging messages between a _pod
client_ and _pod_.
Pods can be created independently from pod clients. Any program can be invoked
as a pod as long as it implements the _pod protocol_. This protocol is
influenced by and built upon battle-tested technologies:
- the [nREPL](https://nrepl.org/) and [LSP](https://microsoft.github.io/language-server-protocol/) protocols
- [bencode](https://en.wikipedia.org/wiki/Bencode)
- [JSON](https://www.json.org/json-en.html)
- [EDN](https://github.com/edn-format/edn)
- composition of UNIX command line tools in via good old stdin and stdout
Currently the following pods are available:
- [clj-kondo](https://github.com/borkdude/clj-kondo/#babashka-pod): a Clojure
linter
- [pod-babashka-filewatcher](https://github.com/babashka/pod-babashka-filewatcher): a
filewatcher pod based on Rust notify.
- [pod-babashka-hsqldb](https://github.com/babashka/pod-babashka-hsqldb): a pod
that allows you to create and fire queries at a
[HSQLDB](http://www.hsqldb.org/) database.
- [pod-jaydeesimon-jsoup](https://github.com/jaydeesimon/pod-jaydeesimon-jsoup):
a pod for parsing HTML using CSS queries backed by Jsoup.
- [pod-lispyclouds-docker](https://github.com/lispyclouds/pod-lispyclouds-docker):
A pod for interacting with docker
The name pod is inspired by [boot's pod
feature](https://github.com/boot-clj/boot/wiki/Pods). It means _underneath_ or
_below_ in Polish and Russian. In Romanian it means _bridge_
([source](https://en.wiktionary.org/wiki/pod)).
## Status
The protocol should be considered alpha. Breaking changes may occur at this
phase and will be documented in `CHANGELOG.md`.
## Usage
@ -23,9 +73,10 @@ On the JVM:
;;=> [#:next.jdbc{:update-count 0}]
```
From the [Small Clojure Interpreter](https://github.com/borkdude/sci):
## Sci
See [test/babashka/pods/sci_test.clj](test/babashka/pods/sci_test.clj).
To use pods in a [sci](https://github.com/borkdude/sci) based project, see
[test/babashka/pods/sci_test.clj](test/babashka/pods/sci_test.clj).
## Why JVM support?
@ -35,8 +86,319 @@ light weight replacement for native interop (JNI, JNA, etc.).
- When developing pods, this library can be used to test them on the JVM.
## Implementing your own pod
### Examples
Beyond the already available pods mentioned above, eductional examples of pods
can be found [here](../examples/pods):
- [pod-lispyclouds-sqlite](../examples/pods/pod-lispyclouds-sqlite): a pod that
allows you to create and fire queries at a [sqlite](https://www.sqlite.org/)
database. Implemented in Python.
### Libraries
If you are looking for libraries to deal with bencode, JSON or EDN, take a look
at the existing pods or [nREPL](https://nrepl.org/nrepl/beyond_clojure.html)
implementations for various languages.
### Naming
When choosing a name for your pod, we suggest the following naming scheme:
```
pod-<user-id>-<pod-name>
```
where `<user-id>` is your Github or Gitlab handle and `<pod-name>` describes
what your pod is about.
Examples:
- [pod-lispyclouds-sqlite](../examples/pods/pod-lispyclouds-sqlite): a pod to
communicate with [sqlite](https://www.sqlite.org/), provided by
[@lispyclouds](https://github.com/lispyclouds).
Pods created by the babashka maintainers use the identifier `babashka`:
- [pod-babashka-hsqldb](https://github.com/borkdude/pod-babashka-hsqldb): a pod
to communicate with [HSQLDB](http://www.hsqldb.org/)
### The protocol
#### Message and payload format
Exchange of _messages_ between pod client and the pod happens in the
[bencode](https://en.wikipedia.org/wiki/Bencode) format. Bencode is a bare-bones
format that only has four types:
- integers
- lists
- dictionaries (maps)
- byte strings
Additionally, _payloads_ like `args` (arguments) or `value` (a function return
value) are encoded in either JSON or EDN.
So remember: messages are in bencode, payloads (particular fields in the
message) are in either JSON or EDN.
Bencode is chosen as the message format because it is a light-weight format
which can be implemented in 200-300 lines of code in most languages. If pods are
implemented in Clojure, they only need to depend on the
[bencode](https://github.com/nrepl/bencode) library and use `pr-str` and
`edn/read-string` for encoding and decoding payloads.
Why isn't EDN or JSON chosen as the message format instead of bencode, you may
ask. Assuming EDN or JSON as the message and payload format for all pods is too
constraining: other languages might already have built-in JSON support and there
might not be a good EDN library available. So we use bencode as the first
encoding and choose one of multiple richer encodings on top of this. More
payload formats might be added in the future (e.g. transit).
When calling the `babashka.pods/load-pod` function, the pod client will start
the pod and leave the pod running throughout the duration of a babashka script.
#### describe
The first message that the pod client will send to the pod on its stdin is:
``` clojure
{"op" "describe"}
```
Encoded in bencode this looks like:
``` clojure
(bencode/write-bencode System/out {"op" "describe"})
;;=> d2:op8:describee
```
The pod should reply to this request with a message in the vein of:
``` clojure
{"format" "json"
"namespaces"
[{"name" "pod.lispyclouds.sqlite"
"vars" [{"name" "execute!"}]}]
"ops" {"shutdown" {}}}
```
In this reply, the pod declares that payloads will be encoded and decoded using
JSON. It also declares that the pod exposes one namespace,
`pod.lispyclouds.sqlite` with one var `execute!`.
The pod encodes the above map to bencode and writes it to stdoud. The pod client
reads this message from the pod's stdout.
Upon receiving this message, the pod client creates these namespaces and vars.
The optional `ops` value communicates which ops the pod supports, beyond
`describe` and `invoke`. It is a map of op names to option maps. In the above
example the pod declares that it supports the `shutdown` op. Since the
`shutdown` op does not need any additional options right now, the value is an
empty map.
As a pod user, you can load the pod with:
``` clojure
(require '[babashka.pods :as pods])
(pods/load-pod "pod-lispyclouds-sqlite")
(some? (find-ns 'pod.lispyclouds.sqlite)) ;;=> true
;; yay, the namespace exists!
;; let's give the namespace an alias
(require '[pod.lispyclouds.sqlite :as sql])
```
#### invoke
When invoking a var that is related to the pod, let's call it a _proxy var_, the
pod client reaches out to the pod with the arguments encoded in JSON or EDN. The
pod will then respond with a return value encoded in JSON or EDN. The pod client
will then decode the return value and present the user with that.
Example: the user invokes `(sql/execute! "select * from foo")`. The pod client
sends this message to the pod:
``` clojure
{"id" "1d17f8fe-4f70-48bf-b6a9-dc004e52d056"
"var" "pod.lispyclouds.sqlite/execute!"
"args" "[\"select * from foo\"]"
```
The `id` is unique identifier generated by the pod client which correlates this
request with a response from the pod.
An example response from the pod could look like:
``` clojure
{"id" "1d17f8fe-4f70-48bf-b6a9-dc004e52d056"
"value" "[[1] [2]]"
"status" "[\"done\"]"}
```
Here, the `value` payload is the return value of the function invocation. The
field `status` contains `"done"`. This tells the pod client that this is the last
message related to the request with `id` `1d17f8fe-4f70-48bf-b6a9-dc004e52d056`.
Now you know most there is to know about the pod protocol!
#### shutdown
When the pod client is about to exit, it sends an `{"op" "shutdown"}` message, if the
pod has declared that it supports it in the `describe` response. Then it waits
for the pod process to end. This gives the pod a chance to clean up resources
before it exits. If the pod does not support the `shutdown` op, the pod process
is killed by the pod client.
#### out and err
Pods may send messages with an `out` and `err` string value. The Pod Client prints
these messages to `*out*` and `*err*`. Stderr from the pod is redirected to
`System/err`.
``` clojure
{"id" "1d17f8fe-4f70-48bf-b6a9-dc004e52d056"
"out" "hello"}
```
``` clojure
{"id" "1d17f8fe-4f70-48bf-b6a9-dc004e52d056"
"err" "debug"}
```
#### Error handling
Responses may contain an `ex-message` string and `ex-data` payload string (JSON
or EDN) along with an `"error"` value in `status`. This will cause the pod client to
throw an `ex-info` with the associated values.
Example:
``` clojure
{"id" "1d17f8fe-4f70-48bf-b6a9-dc004e52d056"
"ex-message" "Illegal input"
"ex-data" "{\"input\": 10}
"status" "[\"done\", \"error\"]"}
```
#### Environment
The pod client will set the `BABASHKA_POD` environment variable to `true` when
invoking the pod. This can be used by the invoked program to determine whether
it should behave as a pod or not.
Added in v0.0.94.
#### Client side code
Pods may implement functions and macros by sending arbitrary code to the pod
client in a `"code"` field as part of a `"var"` section. The code is evaluated
by the pod client inside the declared namespace.
For example, a pod can define a macro called `do-twice`:
``` clojure
{"format" "json"
"namespaces"
[{"name" "pod.babashka.demo"
"vars" [{"name" "do-twice" "code" "(defmacro do-twice [x] `(do ~x ~x))"}]}]}
```
In the pod client:
``` clojure
(pods/load-pod "pod-babashka-demo")
(require '[pod.babashka.demo :as demo])
(demo/do-twice (prn :foo))
;;=>
:foo
:foo
nil
```
#### Async
Asynchronous functions can be implemented using callbacks.
The pod will first declare a wrapper function accepting user provided callbacks
as client side code. An example from the
[filewatcher](https://github.com/babashka/pod-babashka-filewatcher) pod:
``` clojure
(defn watch
([path cb] (watch path cb {}))
([path cb opts]
(babashka.pods/invoke "pod.babashka.filewatcher"
'pod.babashka.filewatcher/watch*
[path opts]
{:on-success (fn [{:keys [:value :done]}] (cb value))
:on-error (fn [{:keys [:ex-message :ex-data]}]
(binding [*out* *err*]
(println "ERROR:" ex-message)))})
nil))
```
The wrapper function will then invoke `babashka.pods/invoke`, a lower level
function to invoke a pod var with callbacks.
The arguments to `babashka.pods/invoke` are:
- a pod identifier string, either explicitly set as `pod-id` in `describe`, or
derived from the first described namespace.
- the symbol of the var to invoke
- the arguments to the var
- an opts map containing `:on-success` and `:on-error` callbacks.
The return value of `babashka.pods/invoke` is a map containing `:result`. When
not using callbacks, this is the return value from the pod var invocation. When
using callbacks, this value is undefined.
The callback `:on-success` is called with a map containing:
- `:value`: a return value from the pod var
- `:done`: a boolean indicating if the var invocation is done (`true`). If
`false` then more values can be expected.
The callback `:on-error` is called with a map containing:
- `:ex-message`: an error message
- `:ex-data`: an arbitrary additional error data map. Typically it will contain
`:type` describing the type of exception that happened in the pod.
If desired, `:ex-message` and `:ex-data` can be reified into a
`java.lang.Exception` using `ex-info`.
In the above example the wrapper function calls the pod identified by
`"pod.babashka.filewatcher"`. It calls the var
`pod.babashka.filewatcher/watch*`. In `:on-success` it pulls out received
values, passing them to the user-provided callback. Additionally, it prints any
errors received from the pod library in `:on-error` to `*err*`.
A user will then use `pod.babashka.filewatcher/watch` like this:
``` clojure
$ clj
Clojure 1.10.1
user=> (require '[babashka.pods :as pods])
nil
user=> (pods/load-pod "pod-babashka-filewatcher")
nil
user=> (require '[pod.babashka.filewatcher :as fw])
nil
user=> (fw/watch "/tmp" (fn [result] (prn "result" result)))
nil
user=> (spit "/tmp/foobar123.txt" "foo")
nil
user=> "result" {:path "/private/tmp/foobar123.txt", :type "create"}
```
## Run tests
To run the tests for the pods library:
```
$ script/test
```

View file

@ -4,3 +4,6 @@
(defn load-pod
([pod-spec] (load-pod pod-spec nil))
([pod-spec opts] (jvm/load-pod pod-spec opts)))
(defn invoke [pod-id sym args opts]
(jvm/invoke pod-id sym args opts))

View file

@ -57,31 +57,36 @@
status (set (map (comp keyword bytes->string) status))
done? (contains? status :done)
error? (contains? status :error)
value (if error?
(let [message (or (some-> (get reply "ex-message")
bytes->string)
"")
data (or (some-> (get reply "ex-data")
bytes->string
read-fn)
{})]
(ex-info message data))
value)
[ex-message ex-data]
(when error?
[(or (some-> (get reply "ex-message")
bytes->string)
"")
(or (some-> (get reply "ex-data")
bytes->string
read-fn)
{})])
chan (get @chans id)
promise? (instance? clojure.lang.IPending chan)
exception (when (and promise? error?)
(ex-info ex-message ex-data))
on-success (:on-success chan)
on-error (:on-error chan)
out (some-> (get reply "out")
bytes->string)
err (some-> (get reply "err")
bytes->string)]
(when (or value* error?)
(if promise?
(deliver chan value)
(async/put! chan value)))
(when (or done? error?)
(if promise?
(deliver chan nil) ;; some ops don't receive a value but are
;; still done.
(async/close! chan)))
(cond promise?
(deliver chan (if error? exception value))
(and (not error?) on-success)
(on-success {:value value
:done done?})
(and error? on-error)
(on-error {:ex-message ex-message
:ex-data ex-data})))
(when (and (or done? error?) promise?)
(deliver chan nil))
(when out
(binding [*out* out-stream]
(println out)))
@ -95,7 +100,10 @@
(defn next-id []
(str (java.util.UUID/randomUUID)))
(defn invoke [pod pod-var args async?]
(defn invoke [pod pod-var args
{:keys [:on-success
:on-error
:async]}]
(let [stream (:stdin pod)
format (:format pod)
chans (:chans pod)
@ -103,18 +111,25 @@
:edn pr-str
:json cheshire/generate-string)
id (next-id)
chan (if async? (async/chan)
(promise))
chan (cond async (async/chan)
(or on-success
on-error) {:on-success on-success
:on-error on-error}
:else (promise))
_ (swap! chans assoc id chan)
_ (write stream {"id" id
"op" "invoke"
"var" (str pod-var)
"args" (write-fn args)})]
(if async? chan ;; TODO: https://blog.jakubholy.net/2019/core-async-error-handling/
(let [v @chan]
(if (instance? Throwable v)
(throw v)
v)))))
;; see: https://blog.jakubholy.net/2019/core-async-error-handling/
(cond async chan
(or on-success on-error) nil
:else (let [v @chan]
(if (instance? Throwable v)
(throw v)
v)))))
(def pods (atom {}))
(defn load-pod
([pod-spec] (load-pod pod-spec nil))
@ -133,6 +148,7 @@
reply (read stdout)
format (-> (get reply "format") bytes->string keyword)
ops (some->> (get reply "ops") keys (map keyword) set)
pod-id (get-maybe-string reply "pod-id")
pod {:process p
:pod-spec pod-spec
:stdin stdin
@ -150,6 +166,9 @@
(.waitFor p))
(.destroy p))))
pod-namespaces (get reply "namespaces")
pod-id (or pod-id (when-let [ns (first pod-namespaces)]
(get-string ns "name")))
pod (assoc pod :pod-id pod-id)
vars-fn (fn [ns-name-str vars]
(reduce
(fn [m var]
@ -163,7 +182,7 @@
(assoc m name-sym
(or code
(fn [& args]
(let [res (invoke pod sym args async?)]
(let [res (invoke pod sym args {:async async?})]
res))))))
{}
vars))
@ -174,5 +193,14 @@
vars (vars-fn name-str vars)]
(assoc namespaces name-sym vars)))
{}
pod-namespaces)]
(assoc pod :namespaces pod-namespaces))))
pod-namespaces)
pod (assoc pod :namespaces pod-namespaces)]
(swap! pods assoc pod-id pod)
pod)))
(defn lookup-pod [pod-id]
(get @pods pod-id))
(defn invoke-public [pod-id fn-sym args opts]
(let [pod (lookup-pod pod-id)]
{:result (invoke pod fn-sym args opts)}))

View file

@ -18,3 +18,6 @@
(load-string v)))))
(future (impl/processor pod))
nil)))
(defn invoke [pod-id sym args opts]
(impl/invoke-public pod-id sym args opts))

View file

@ -23,3 +23,6 @@
(sci/future (impl/processor pod))
nil)))
{:sci.impl/op :needs-ctx}))
(defn invoke [pod-id sym args opts]
(impl/invoke-public pod-id sym args opts))

View file

@ -73,10 +73,18 @@
args (read-string args)
args (read-fn args)]
(case var
pod.test-pod/add-sync (write
{"value" (write-fn (apply + args))
"id" id
"status" ["done"]})
pod.test-pod/add-sync
(try (let [ret (apply + args)]
(write
{"value" (write-fn ret)
"id" id
"status" ["done"]}))
(catch Exception e
(write
{"ex-data" (write-fn {:args args})
"ex-message" (.getMessage e)
"status" ["done" "error"]
"id" id})))
pod.test-pod/range-stream
(let [rng (apply range args)]
(doseq [v rng]

View file

@ -0,0 +1,38 @@
(require '[babashka.pods :as pods])
(prn (pods/load-pod ["clojure" "-A:test-pod"])) ;; should return nil
(require '[pod.test-pod :as pod])
(def stream-results (atom []))
(pods/invoke "pod.test-pod" 'pod.test-pod/range-stream [1 10]
{:on-success (fn [{:keys [:value]}]
(swap! stream-results conj value))})
(while (< (count @stream-results) 9))
(def ex-result
(try (pod.test-pod/error 1 2 3)
(catch clojure.lang.ExceptionInfo e
(str (ex-message e) " / " (ex-data e)))))
(pod.test-pod/print "hello" "print" "this" "debugging" "message")
(pod.test-pod/print-err "hello" "print" "this" "error")
(pod/do-twice (prn :foo))
(def callback-result (promise))
(pods/invoke "pod.test-pod" 'pod.test-pod/add-sync [1 2]
{:on-success (fn [{:keys [:value :done]}]
(when done (deliver callback-result value)))})
(def error-result (promise))
(pods/invoke "pod.test-pod" 'pod.test-pod/add-sync ["1" 2]
{:on-error (fn [m]
(deliver error-result m))})
[(pod/assoc {:a 1} :b 2)
(pod.test-pod/add-sync 1 2 3)
@stream-results
ex-result
(pod.test-pod/return-nil)
@callback-result
(:ex-message @error-result)
(:ex-data @error-result)]

View file

@ -13,8 +13,8 @@
(sci/eval-string
test-program
{:namespaces {'babashka.pods
{'load-pod pods/load-pod}
{'load-pod pods/load-pod
'invoke pods/invoke}
'clojure.core.async
{'<!! async/<!!}}}))]
(assertions out err ret)))

View file

@ -1,36 +1,17 @@
(ns babashka.pods.test-common
(:require [clojure.test :refer [is]]))
(:require [clojure.java.io :as io]
[clojure.test :refer [is]]))
(def test-program "
(require '[babashka.pods :as pods])
(require '[clojure.core.async :as async])
(prn (pods/load-pod [\"clojure\" \"-A:test-pod\"])) ;; should return nil
(require '[pod.test-pod :as pod])
(def stream-results (atom []))
(let [chan (pod.test-pod/range-stream 1 10)]
(loop []
(when-let [x (async/<!! chan)]
(swap! stream-results conj x)
(recur))))
(def ex-result
(try (pod.test-pod/error 1 2 3)
(catch clojure.lang.ExceptionInfo e
(str (ex-message e) \" / \" (ex-data e)))))
(pod.test-pod/print \"hello\" \"print\" \"this\" \"debugging\" \"message\")
(pod.test-pod/print-err \"hello\" \"print\" \"this\" \"error\")
(pod/do-twice (prn :foo))
[(pod/assoc {:a 1} :b 2)
(pod.test-pod/add-sync 1 2 3)
@stream-results
ex-result
(pod.test-pod/return-nil)]")
(def test-program (slurp (io/file "test-resources" "test_program.clj")))
(defn assertions [out err ret]
(is (= '[{:a 1, :b 2}
6
[1 2 3 4 5 6 7 8 9]
"Illegal arguments / {:args (1 2 3)}"
nil] ret))
nil
3
"java.lang.String cannot be cast to java.lang.Number"
{:args ["1" 2]}] ret))
(is (= "nil\n(\"hello\" \"print\" \"this\" \"debugging\" \"message\")\n:foo\n:foo\n" (str out)))
(is (= "(\"hello\" \"print\" \"this\" \"error\")\n" (str err))))