[#847] Better error handling for parallel tasks (2)
This commit is contained in:
parent
e18f4302a8
commit
9f48048472
2 changed files with 20 additions and 6 deletions
|
|
@ -3,7 +3,7 @@
|
||||||
[babashka.impl.common :refer [ctx bb-edn debug]]
|
[babashka.impl.common :refer [ctx bb-edn debug]]
|
||||||
[babashka.impl.deps :as deps]
|
[babashka.impl.deps :as deps]
|
||||||
[babashka.process :as p]
|
[babashka.process :as p]
|
||||||
[clojure.core.async :refer [chan <!! alts!! thread]]
|
[clojure.core.async :refer [<!!]]
|
||||||
[clojure.java.io :as io]
|
[clojure.java.io :as io]
|
||||||
[clojure.string :as str]
|
[clojure.string :as str]
|
||||||
[rewrite-clj.node :as node]
|
[rewrite-clj.node :as node]
|
||||||
|
|
@ -12,7 +12,7 @@
|
||||||
[sci.core :as sci])
|
[sci.core :as sci])
|
||||||
(:import [clojure.core.async.impl.channels ManyToManyChannel]))
|
(:import [clojure.core.async.impl.channels ManyToManyChannel]))
|
||||||
|
|
||||||
(defn chan? [x]
|
(defn -chan? [x]
|
||||||
(instance? ManyToManyChannel x))
|
(instance? ManyToManyChannel x))
|
||||||
|
|
||||||
(def sci-ns (sci/create-ns 'babashka.tasks nil))
|
(def sci-ns (sci/create-ns 'babashka.tasks nil))
|
||||||
|
|
@ -123,7 +123,7 @@
|
||||||
|
|
||||||
(defn -wait [res]
|
(defn -wait [res]
|
||||||
(when res
|
(when res
|
||||||
(if (chan? res)
|
(if (-chan? res)
|
||||||
(let [[_task-name res] (<!! res)]
|
(let [[_task-name res] (<!! res)]
|
||||||
(if (instance? Throwable res)
|
(if (instance? Throwable res)
|
||||||
(throw (ex-info (ex-message res)
|
(throw (ex-info (ex-message res)
|
||||||
|
|
@ -165,13 +165,14 @@
|
||||||
(defn wait-tasks [deps]
|
(defn wait-tasks [deps]
|
||||||
(if deps
|
(if deps
|
||||||
(format "
|
(format "
|
||||||
(let [chans %s]
|
(let [chans (filter babashka.tasks/-chan? %s)]
|
||||||
(loop [cs chans]
|
(loop [cs chans]
|
||||||
(let [[v p] (clojure.core.async/alts!! cs)
|
(let [[v p] (clojure.core.async/alts!! cs)
|
||||||
[task-name v] v
|
[task-name v] v
|
||||||
cs (filterv #(not= p %%) cs)
|
cs (filterv #(not= p %%) cs)
|
||||||
;; _ (.println System/err (str \"n: \" task-name \" v: \" v))
|
;; _ (.println System/err (str \"n: \" task-name \" v: \" v))
|
||||||
_ (intern *ns* (symbol task-name) v)]
|
;; check for existence of v, as the channel may already have been consumed once
|
||||||
|
_ (when v (intern *ns* (symbol task-name) v))]
|
||||||
(when (instance? Throwable v)
|
(when (instance? Throwable v)
|
||||||
(throw (ex-info (ex-message v)
|
(throw (ex-info (ex-message v)
|
||||||
{:babashka/exit 1
|
{:babashka/exit 1
|
||||||
|
|
@ -447,6 +448,7 @@
|
||||||
{'shell (sci/copy-var shell sci-ns)
|
{'shell (sci/copy-var shell sci-ns)
|
||||||
'clojure (sci/copy-var clojure sci-ns)
|
'clojure (sci/copy-var clojure sci-ns)
|
||||||
'-wait (sci/copy-var -wait sci-ns)
|
'-wait (sci/copy-var -wait sci-ns)
|
||||||
|
'-chan? (sci/copy-var -chan? sci-ns)
|
||||||
'-err-thread (sci/copy-var -err-thread sci-ns)
|
'-err-thread (sci/copy-var -err-thread sci-ns)
|
||||||
'*task* task
|
'*task* task
|
||||||
'current-task current-task
|
'current-task current-task
|
||||||
|
|
|
||||||
|
|
@ -224,7 +224,19 @@
|
||||||
(throw (ex-info "0 noes" {})))
|
(throw (ex-info "0 noes" {})))
|
||||||
c {:depends [a b]}}}
|
c {:depends [a b]}}}
|
||||||
(is (thrown-with-msg? Exception #"0 noes"
|
(is (thrown-with-msg? Exception #"0 noes"
|
||||||
(bb "run" "--parallel" "c")))))))
|
(bb "run" "--parallel" "c")))))
|
||||||
|
(testing "edge case"
|
||||||
|
(test-utils/with-config '{:tasks
|
||||||
|
{a (run '-a {:parallel true})
|
||||||
|
-a {:depends [a:a a:b c]
|
||||||
|
:task (prn [a:a a:b c])}
|
||||||
|
a:a {:depends [c]
|
||||||
|
:task (+ 1 2 3)}
|
||||||
|
a:b {:depends [c]
|
||||||
|
:task (do (Thread/sleep 10)
|
||||||
|
(+ 1 2 3))}
|
||||||
|
c (do (Thread/sleep 10) :c)}}
|
||||||
|
(is (= [6 6 :c] (bb "run" "--prn" "a")))))))
|
||||||
|
|
||||||
(deftest list-tasks-test
|
(deftest list-tasks-test
|
||||||
(test-utils/with-config {}
|
(test-utils/with-config {}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue