[#847] Better error handling for parallel tasks

This commit is contained in:
Michiel Borkent 2021-05-18 12:39:21 +02:00 committed by GitHub
parent 00091136d3
commit e18f4302a8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 118 additions and 38 deletions

View file

@ -3,12 +3,17 @@
[babashka.impl.common :refer [ctx bb-edn debug]] [babashka.impl.common :refer [ctx bb-edn debug]]
[babashka.impl.deps :as deps] [babashka.impl.deps :as deps]
[babashka.process :as p] [babashka.process :as p]
[clojure.core.async :refer [chan <!! alts!! thread]]
[clojure.java.io :as io] [clojure.java.io :as io]
[clojure.string :as str] [clojure.string :as str]
[rewrite-clj.node :as node] [rewrite-clj.node :as node]
[rewrite-clj.parser :as parser] [rewrite-clj.parser :as parser]
[rewrite-clj.zip :as zip] [rewrite-clj.zip :as zip]
[sci.core :as sci])) [sci.core :as sci])
(:import [clojure.core.async.impl.channels ManyToManyChannel]))
(defn chan? [x]
(instance? ManyToManyChannel x))
(def sci-ns (sci/create-ns 'babashka.tasks nil)) (def sci-ns (sci/create-ns 'babashka.tasks nil))
(def default-log-level :error) (def default-log-level :error)
@ -118,8 +123,13 @@
(defn -wait [res] (defn -wait [res]
(when res (when res
(if (future? res) (if (chan? res)
@res (let [[_task-name res] (<!! res)]
(if (instance? Throwable res)
(throw (ex-info (ex-message res)
{:babashka/exit 1
:data (ex-data res)}))
res))
res))) res)))
(defn depends-map [tasks target-name] (defn depends-map [tasks target-name]
@ -127,13 +137,21 @@
m [target-name deps]] m [target-name deps]]
(into {} (cons m (map #(depends-map tasks %) deps))))) (into {} (cons m (map #(depends-map tasks %) deps)))))
(defmacro -err-thread [name & body]
`(clojure.core.async/thread
(try [~name ~@body]
(catch Throwable e#
[~name (ex-info (str "Error in task: " ~name
"\n" (ex-message e#))
(or (ex-data e#) {}))]))))
(defn wrap-body [task-map prog parallel?] (defn wrap-body [task-map prog parallel?]
(format "(binding [ (format "(binding [
babashka.tasks/*task* '%s] babashka.tasks/*task* '%s]
%s)" %s)"
(pr-str task-map) (pr-str task-map)
(if parallel? (if parallel?
(format "(future %s)" prog) (format "(babashka.tasks/-err-thread \"%s\" %s)" (:name task-map) prog)
prog))) prog)))
(defn wrap-def [task-map prog parallel? last?] (defn wrap-def [task-map prog parallel? last?]
@ -144,8 +162,24 @@
(format "(babashka.tasks/-wait %s)" task-name) (format "(babashka.tasks/-wait %s)" task-name)
task-name)))) task-name))))
(defn deref-task [dep] (defn wait-tasks [deps]
(format "(def %s (babashka.tasks/-wait %s))" dep dep)) (if deps
(format "
(let [chans %s]
(loop [cs chans]
(let [[v p] (clojure.core.async/alts!! cs)
[task-name v] v
cs (filterv #(not= p %%) cs)
;; _ (.println System/err (str \"n: \" task-name \" v: \" v))
_ (intern *ns* (symbol task-name) v)]
(when (instance? Throwable v)
(throw (ex-info (ex-message v)
{:babashka/exit 1
:data (ex-data v)})))
(when (seq cs)
(recur cs)))))" deps)
"")
#_(format "(def %s (babashka.tasks/-wait %s))" dep dep))
(defn wrap-enter-leave [task-name prog enter leave] (defn wrap-enter-leave [task-name prog enter leave]
(str (pr-str enter) "\n" (str (pr-str enter) "\n"
@ -159,7 +193,7 @@
(defn wrap-depends [prog depends parallel?] (defn wrap-depends [prog depends parallel?]
(if parallel? (if parallel?
(format "(do %s)" (str (str/join "\n" (map deref-task depends)) "\n" prog)) (format "(do %s)" (str (str "\n" (wait-tasks depends)) "\n" prog))
prog)) prog))
(defn assemble-task-1 (defn assemble-task-1
@ -312,7 +346,7 @@
(println "No such task:" t)) 1]) (println "No such task:" t)) 1])
(if-let [task (get tasks t)] (if-let [task (get tasks t)]
(let [prog (str prog "\n" (let [prog (str prog "\n"
(apply str (map deref-task depends)) #_(wait-tasks depends) #_(apply str (map deref-task depends))
"\n" "\n"
(assemble-task-1 task-map task parallel? true)) (assemble-task-1 task-map task parallel? true))
extra-paths (concat extra-paths (:extra-paths task)) extra-paths (concat extra-paths (:extra-paths task))
@ -413,6 +447,7 @@
{'shell (sci/copy-var shell sci-ns) {'shell (sci/copy-var shell sci-ns)
'clojure (sci/copy-var clojure sci-ns) 'clojure (sci/copy-var clojure sci-ns)
'-wait (sci/copy-var -wait sci-ns) '-wait (sci/copy-var -wait sci-ns)
'-err-thread (sci/copy-var -err-thread sci-ns)
'*task* task '*task* task
'current-task current-task 'current-task current-task
'current-state state 'current-state state

View file

@ -0,0 +1,23 @@
{:tasks
{coffeep {:depends [groundsp hot-waterp filterp mugp]
:task (do (Thread/sleep 300)
[:made-coffee [groundsp hot-waterp filterp mugp]])}
groundsp {:depends [beansp]
:task (do
(Thread/sleep 200)
[:ground-beans [beansp]])}
hot-waterp {:depends [waterp]
:task (do (Thread/sleep 200)
[:heated-water [waterp]])}
filterp {:task (do
(Thread/sleep 100)
:filter)}
mugp {:task (do
(Thread/sleep 100)
:mug)}
waterp {:task (do
(Thread/sleep 100)
:poured-water)}
beansp {:task (do
(Thread/sleep 100)
:measured-beans)}}}

View file

@ -70,15 +70,15 @@
"ls foobar"))}} "ls foobar"))}}
(is (= 1337 (bb "run" "--prn" "foo")))) (is (= 1337 (bb "run" "--prn" "foo"))))
(test-utils/with-config (test-utils/with-config
{:tasks {'foo (list '-> (list 'shell {:out out {:tasks {'foo (list '-> (list 'shell {:out out
:err out :err out
:error-fn :error-fn
'(fn [opts] '(fn [opts]
(and (:task opts) (and (:task opts)
(:proc opts) (:proc opts)
(not (zero? (:exit (:proc opts))))))} (not (zero? (:exit (:proc opts))))))}
"ls foobar"))}} "ls foobar"))}}
(is (true? (bb "run" "--prn" "foo"))))) (is (true? (bb "run" "--prn" "foo")))))
(fs/delete out) (fs/delete out)
(testing "clojure test" (testing "clojure test"
(test-utils/with-config {:tasks {'foo (list 'clojure {:out out} (test-utils/with-config {:tasks {'foo (list 'clojure {:out out}
@ -167,27 +167,27 @@
(testing "no such task" (testing "no such task"
(test-utils/with-config '{:tasks {a (+ 1 2 3)}} (test-utils/with-config '{:tasks {a (+ 1 2 3)}}
(is (thrown-with-msg? (is (thrown-with-msg?
Exception #"No such task: b" Exception #"No such task: b"
(bb "run" "b"))))) (bb "run" "b")))))
(testing "unresolved dependency" (testing "unresolved dependency"
(test-utils/with-config '{:tasks {a (+ 1 2 3) (test-utils/with-config '{:tasks {a (+ 1 2 3)
b {:depends [x] b {:depends [x]
:task (+ a 4 5 6)}}} :task (+ a 4 5 6)}}}
(is (thrown-with-msg? (is (thrown-with-msg?
Exception #"No such task: x" Exception #"No such task: x"
(bb "run" "b"))))) (bb "run" "b")))))
(testing "cyclic task" (testing "cyclic task"
(test-utils/with-config '{:tasks {b {:depends [b] (test-utils/with-config '{:tasks {b {:depends [b]
:task (+ a 4 5 6)}}} :task (+ a 4 5 6)}}}
(is (thrown-with-msg? (is (thrown-with-msg?
Exception #"Cyclic task: b" Exception #"Cyclic task: b"
(bb "run" "b")))) (bb "run" "b"))))
(test-utils/with-config '{:tasks {c {:depends [b]} (test-utils/with-config '{:tasks {c {:depends [b]}
b {:depends [c] b {:depends [c]
:task (+ a 4 5 6)}}} :task (+ a 4 5 6)}}}
(is (thrown-with-msg? (is (thrown-with-msg?
Exception #"Cyclic task: b" Exception #"Cyclic task: b"
(bb "run" "b"))))) (bb "run" "b")))))
(testing "doc" (testing "doc"
(test-utils/with-config '{:tasks {b {:doc "Beautiful docstring"}}} (test-utils/with-config '{:tasks {b {:doc "Beautiful docstring"}}}
(let [s (test-utils/bb nil "doc" "b")] (let [s (test-utils/bb nil "doc" "b")]
@ -203,12 +203,34 @@
:out)}} :out)}}
(let [s (bb "run" "--prn" "a")] (let [s (bb "run" "--prn" "a")]
(is (= "hello\n" s))))) (is (= "hello\n" s)))))
(testing "parallel test"
(test-utils/with-config (edn/read-string (slurp "test-resources/coffee-tasks.edn"))
(let [tree [:made-coffee [[:ground-beans [:measured-beans]] [:heated-water [:poured-water]] :filter :mug]]
t0 (System/currentTimeMillis)
s (bb "run" "--prn" "coffeep")
t1 (System/currentTimeMillis)
delta-sequential (- t1 t0)]
(is (= tree s))
(test-utils/with-config (edn/read-string (slurp "test-resources/coffee-tasks.edn"))
(let [t0 (System/currentTimeMillis)
s (bb "run" "--parallel" "--prn" "coffeep")
t1 (System/currentTimeMillis)
delta-parallel (- t1 t0)]
(is (= tree s))
(is (< delta-parallel delta-sequential))))))
(testing "exception"
(test-utils/with-config '{:tasks {a (Thread/sleep 10000)
b (do (Thread/sleep 10)
(throw (ex-info "0 noes" {})))
c {:depends [a b]}}}
(is (thrown-with-msg? Exception #"0 noes"
(bb "run" "--parallel" "c")))))))
(deftest list-tasks-test (deftest list-tasks-test
(test-utils/with-config {} (test-utils/with-config {}
(let [res (test-utils/bb nil "tasks")] (let [res (test-utils/bb nil "tasks")]
(is (str/includes? res "No tasks found.")))) (is (str/includes? res "No tasks found."))))
(test-utils/with-config "{:paths [\"test-resources/task_scripts\"] (test-utils/with-config "{:paths [\"test-resources/task_scripts\"]
:tasks {:requires ([tasks :as t]) :tasks {:requires ([tasks :as t])
task1 task1
{:doc \"task1 doc\" {:doc \"task1 doc\"
@ -226,16 +248,16 @@
baz non-existing/bar baz non-existing/bar
quux {:requires ([tasks :as t2]) quux {:requires ([tasks :as t2])
:task t2/foo}}}" :task t2/foo}}}"
(let [res (test-utils/bb nil "tasks")] (let [res (test-utils/bb nil "tasks")]
(is (= "The following tasks are available:\n\ntask1 task1 doc\ntask2 task2 doc\nfoo Foo docstring\nbar Foo docstring\nbaz \nquux Foo docstring\n" (is (= "The following tasks are available:\n\ntask1 task1 doc\ntask2 task2 doc\nfoo Foo docstring\nbar Foo docstring\nbaz \nquux Foo docstring\n"
res)))) res))))
(testing ":tasks is the first node" (testing ":tasks is the first node"
(test-utils/with-config "{:tasks {task1 (test-utils/with-config "{:tasks {task1
{:doc \"task1 doc\" {:doc \"task1 doc\"
:task (+ 1 2 3)}}}" :task (+ 1 2 3)}}}"
(let [res (test-utils/bb nil "tasks")] (let [res (test-utils/bb nil "tasks")]
(is (= "The following tasks are available:\n\ntask1 task1 doc\n" (is (= "The following tasks are available:\n\ntask1 task1 doc\n"
res))))))) res))))))
(deftest task-priority-test (deftest task-priority-test
(when-not test-utils/native? (when-not test-utils/native?