parent
5c6e2b5167
commit
141881376d
3 changed files with 109 additions and 50 deletions
1
deps.edn
1
deps.edn
|
|
@ -124,7 +124,6 @@
|
||||||
io.lambdaforge/datalog-parser {:mvn/version "0.1.9"}
|
io.lambdaforge/datalog-parser {:mvn/version "0.1.9"}
|
||||||
clj-stacktrace/clj-stacktrace {:mvn/version "0.2.8"}
|
clj-stacktrace/clj-stacktrace {:mvn/version "0.2.8"}
|
||||||
clojure-msgpack/clojure-msgpack {:mvn/version "1.2.1"}
|
clojure-msgpack/clojure-msgpack {:mvn/version "1.2.1"}
|
||||||
cli-matic {:git/url "https://github.com/l3nz/cli-matic.git", :git/sha "9cd53ba7336363e3d06650dbad413b6f8b06e471"}
|
|
||||||
cli-matic/cli-matic {:git/url "https://github.com/l3nz/cli-matic.git", :git/sha "9cd53ba7336363e3d06650dbad413b6f8b06e471"}}
|
cli-matic/cli-matic {:git/url "https://github.com/l3nz/cli-matic.git", :git/sha "9cd53ba7336363e3d06650dbad413b6f8b06e471"}}
|
||||||
:classpath-overrides {org.clojure/clojure nil
|
:classpath-overrides {org.clojure/clojure nil
|
||||||
org.clojure/spec.alpha nil}}
|
org.clojure/spec.alpha nil}}
|
||||||
|
|
|
||||||
|
|
@ -152,25 +152,40 @@
|
||||||
(format "(babashka.tasks/-wait %s)" task-name)
|
(format "(babashka.tasks/-wait %s)" task-name)
|
||||||
task-name))))
|
task-name))))
|
||||||
|
|
||||||
|
(def o (Object.))
|
||||||
|
|
||||||
|
#_:clj-kondo/ignore
|
||||||
|
(defn- log
|
||||||
|
"Used internally for debugging"
|
||||||
|
[& strs]
|
||||||
|
(locking o
|
||||||
|
(apply prn strs)))
|
||||||
|
|
||||||
(defn wait-tasks [deps]
|
(defn wait-tasks [deps]
|
||||||
(if deps
|
(if deps
|
||||||
(format "
|
(format
|
||||||
(let [chans (filter babashka.tasks/-chan? %s)]
|
(pr-str
|
||||||
|
'(let [chans (filter babashka.tasks/-chan? %s)]
|
||||||
(loop [cs chans]
|
(loop [cs chans]
|
||||||
(when (seq cs)
|
(when (seq cs)
|
||||||
(let [[v p] (clojure.core.async/alts!! cs)
|
(let [[v* p] (clojure.core.async/alts!! cs)
|
||||||
[task-name v] v
|
[task-name v] v*
|
||||||
cs (filterv #(not= p %%) cs)
|
cs (filterv #(not= p %) cs)
|
||||||
;; _ (.println System/err (str \"n: \" task-name \" v: \" v))
|
_ (when v* (intern *ns* (symbol task-name) v))]
|
||||||
;; check for existence of v, as the channel may already have been consumed once
|
|
||||||
_ (when v (intern *ns* (symbol task-name) v))]
|
|
||||||
(when (instance? Throwable v)
|
(when (instance? Throwable v)
|
||||||
(throw (ex-info (ex-message v)
|
(throw (ex-info (ex-message v)
|
||||||
{:babashka/exit 1
|
{:babashka/exit 1
|
||||||
:data (ex-data v)})))
|
:data (ex-data v)})))
|
||||||
(recur cs)))))" deps)
|
(recur cs))))
|
||||||
"")
|
;; since resolving channels into values may happen in parallel and some
|
||||||
#_(format "(def %s (babashka.tasks/-wait %s))" dep dep))
|
;; channels may have been resolved on other threads, we should wait
|
||||||
|
;; until all deps have been interned as values rather than chans
|
||||||
|
;; see issue 1190
|
||||||
|
(loop [deps '%s]
|
||||||
|
(when (some (fn [task-name]
|
||||||
|
(babashka.tasks/-chan? (deref (resolve (symbol task-name))))) deps)
|
||||||
|
(recur deps))))) deps deps)
|
||||||
|
""))
|
||||||
|
|
||||||
(defn wrap-enter-leave [task-name prog enter leave]
|
(defn wrap-enter-leave [task-name prog enter leave]
|
||||||
(str (pr-str enter) "\n"
|
(str (pr-str enter) "\n"
|
||||||
|
|
@ -184,7 +199,8 @@
|
||||||
|
|
||||||
(defn wrap-depends [prog depends parallel?]
|
(defn wrap-depends [prog depends parallel?]
|
||||||
(if parallel?
|
(if parallel?
|
||||||
(format "(do %s)" (str (str "\n" (wait-tasks depends)) "\n" prog))
|
(format "(do %s)" (str (str "\n" (wait-tasks depends))
|
||||||
|
"\n" prog))
|
||||||
prog))
|
prog))
|
||||||
|
|
||||||
(defn assemble-task-1
|
(defn assemble-task-1
|
||||||
|
|
@ -231,7 +247,7 @@
|
||||||
%s ;; deps
|
%s ;; deps
|
||||||
|
|
||||||
(ns %s %s)
|
(ns %s %s)
|
||||||
(require '[babashka.tasks])
|
(require '[babashka.tasks #_#_:refer [log]])
|
||||||
(when-not (resolve 'clojure)
|
(when-not (resolve 'clojure)
|
||||||
;; we don't use refer so users can override this
|
;; we don't use refer so users can override this
|
||||||
(intern *ns* 'clojure babashka.tasks/clojure))
|
(intern *ns* 'clojure babashka.tasks/clojure))
|
||||||
|
|
@ -441,4 +457,5 @@
|
||||||
'*task* task
|
'*task* task
|
||||||
'current-task current-task
|
'current-task current-task
|
||||||
'current-state state
|
'current-state state
|
||||||
'run (sci/copy-var run sci-ns)})
|
'run (sci/copy-var run sci-ns)
|
||||||
|
#_#_'log log})
|
||||||
|
|
|
||||||
|
|
@ -373,3 +373,46 @@ even more stuff here\"
|
||||||
(testing "bb.edn without :deps should not require deps.clj"
|
(testing "bb.edn without :deps should not require deps.clj"
|
||||||
(test-utils/with-config '{:tasks {a 1}}
|
(test-utils/with-config '{:tasks {a 1}}
|
||||||
(bb "-e" "(+ 1 2 3)"))))))
|
(bb "-e" "(+ 1 2 3)"))))))
|
||||||
|
|
||||||
|
(deftest deps-race-condition-test
|
||||||
|
(test-utils/with-config
|
||||||
|
(pr-str '{:tasks {task-b (do
|
||||||
|
(Thread/sleep 10)
|
||||||
|
:task00-out)
|
||||||
|
task-c {:depends [task-b]
|
||||||
|
:task (do
|
||||||
|
(println
|
||||||
|
"task-b: "
|
||||||
|
(type task-b))
|
||||||
|
{})}
|
||||||
|
task-a {:task (do
|
||||||
|
(Thread/sleep 10)
|
||||||
|
:task0-out)}
|
||||||
|
task-e {:depends [task-e1] :task {}}
|
||||||
|
task-e2 {:depends [task-a] :task {}}
|
||||||
|
task-e3 {:depends [task-b] :task {}}
|
||||||
|
task-e1 {:depends [task-e2 task-e3]
|
||||||
|
:task {}}
|
||||||
|
task-h {:depends [task-a task-b]
|
||||||
|
:task {}}
|
||||||
|
task-d {:task (do (Thread/sleep 2) {})}
|
||||||
|
task-f {:depends [task-d task-e task-a]
|
||||||
|
:task {}}
|
||||||
|
task-g {:depends [task-f
|
||||||
|
task-d
|
||||||
|
task-a
|
||||||
|
task-c
|
||||||
|
task-h]
|
||||||
|
:task {}}}})
|
||||||
|
(time (dotimes [_ 50]
|
||||||
|
(is (str/includes? (test-utils/bb nil "run" "--parallel" "task-g")
|
||||||
|
"task-b: clojure.lang.Keyword"))))))
|
||||||
|
|
||||||
|
(deftest parallel-nil-results-test
|
||||||
|
(test-utils/with-config
|
||||||
|
(pr-str '{:tasks {a (do nil)
|
||||||
|
b (do nil)
|
||||||
|
c (do nil)
|
||||||
|
d {:depends [a b c]
|
||||||
|
:task (prn [a b c])}}})
|
||||||
|
(is (= [nil nil nil] (bb "run" "--parallel" "d")))))
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue