Skip to content

Commit

Permalink
Make all thread functions rejection exception safe
Browse files Browse the repository at this point in the history
  • Loading branch information
niwinz committed Dec 4, 2023
1 parent ae41f5e commit 8155a71
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 27 deletions.
12 changes: 7 additions & 5 deletions src/promesa/core.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
d))
([f executor]
(c/let [d (impl/deferred)]
(exec/run! executor (fn []
(exec/run executor (fn []
(try
(f #(pt/-resolve! d %)
#(pt/-reject! d %))
Expand Down Expand Up @@ -610,8 +610,10 @@
"Analogous to `clojure.core.async/thread` that returns a promise
instance instead of the `Future`. Useful for executing synchronous
code in a separate thread (also works in cljs)."
([f] (exec/submit! :thread f))
([executor f] (exec/submit! executor f)))
([f]
(exec/submit :thread f))
([executor f]
(exec/submit executor f)))

(defn vthread-call
"A shortcut for `(p/thread-call :vthread f)`."
Expand Down Expand Up @@ -664,14 +666,14 @@
(~rej-s ~err-s)
(if (recur? ~res-s)
(do
(promesa.exec/run!
(promesa.exec/run
:vthread
~(if (seq names)
`(fn [] (apply ~tsym (:bindings ~res-s)))
tsym))
nil)
(~rsv-s ~res-s)))))))]
(promesa.exec/run!
(promesa.exec/run
:vthread
~(if (seq names)
`(fn [] (~tsym ~@fvals))
Expand Down
113 changes: 95 additions & 18 deletions src/promesa/exec.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
[promesa.protocols :as pt]
[promesa.util :as pu]
#?(:cljs [goog.object :as gobj])
#?(:cljs [promesa.impl.promise :as impl]))
#?(:cljs [promesa.impl.promise :as pimpl]))
#?(:clj
(:import
clojure.lang.Var
Expand Down Expand Up @@ -235,43 +235,118 @@

(defn exec!
"Run the task in the provided executor, returns `nil`. Analogous to
the `(.execute executor f)`. Fire and forget."
the `(.execute executor f)`. Fire and forget.
Exception unsafe, can raise exceptions if the executor
rejects the task."

([f]
(pt/-exec! (resolve-executor *default-executor*) f))
(let [f (if (fn? f) (wrap-bindings f) f)]
(pt/-exec! (resolve-executor *default-executor*) f)))
([executor f]
(pt/-exec! (resolve-executor executor) f)))
(let [f (if (fn? f) (wrap-bindings f) f)]
(pt/-exec! (resolve-executor executor) f))))

(defn run!
"Run the task in the provided executor."
"Run the task in the provided executor.
Exception unsafe, can raise exceptions if the executor
rejects the task."
([f]
(pt/-run! (resolve-executor *default-executor*) f))
(let [f (if (fn? f) (wrap-bindings f) f)]
(pt/-run! (resolve-executor *default-executor*) f)))
([executor f]
(pt/-run! (resolve-executor executor) f)))
(let [f (if (fn? f) (wrap-bindings f) f)]
(pt/-run! (resolve-executor executor) f))))

(defn submit!
"Submit a task to be executed in a provided executor
and return a promise that will be completed with
the return value of a task.
A task is a plain clojure function."
Exception unsafe, can raise exceptions if the executor
rejects the task."
([f]
(let [f (if (fn? f) (wrap-bindings f) f)]
(pt/-submit! (resolve-executor *default-executor*) f))
([executor f]
(let [f (if (fn? f) (wrap-bindings f) f)]
(pt/-submit! (resolve-executor executor) f)))))
(pt/-submit! (resolve-executor *default-executor*) f)))
([executor f]
(let [f (if (fn? f) (wrap-bindings f) f)]
(pt/-submit! (resolve-executor executor) f))))

(defn schedule!
"Schedule a callable to be executed after the `ms` delay
is reached.
In JVM it uses a scheduled executor service and in JS
it uses the `setTimeout` function."
it uses the `setTimeout` function.
Exception unsafe, can raise exceptions if the executor
rejects the task."
([ms f]
(pt/-schedule! (resolve-scheduler) ms f))
([scheduler ms f]
(pt/-schedule! (resolve-scheduler scheduler) ms f)))


(defn- rejected
[v]
#?(:cljs (pimpl/rejected v)
:clj (let [p (CompletableFuture.)]
(.completeExceptionally ^CompletableFuture p v)
p)))

(defn exec
"Exception safe version of `exec!`. It always returns an promise instance."
([f]
(try
(exec! f)
(catch #?(:clj Throwable :cljs :default) cause
(rejected cause))))
([executor f]
(try
(exec! executor f)
(catch #?(:clj Throwable :cljs :default) cause
(rejected cause)))))

(defn run
"Exception safe version of `run!`. It always returns an promise instance."
([f]
(try
(run! f)
(catch #?(:clj Throwable :cljs :default) cause
(rejected cause))))
([executor f]
(try
(run! executor f)
(catch #?(:clj Throwable :cljs :default) cause
(rejected cause)))))

(defn submit
"Exception safe version of `submit!`. It always returns an promise instance."
([f]
(try
(submit! f)
(catch #?(:clj Throwable :cljs :default) cause
(rejected cause))))
([executor f]
(try
(submit! executor f)
(catch #?(:clj Throwable :cljs :default) cause
(rejected cause)))))

(defn schedule
"Exception safe version of `schedule!`. It always returns an promise instance."
([ms f]
(try
(schedule! ms f)
(catch #?(:clj Throwable :cljs :default) cause
(rejected cause))))
([scheduler ms f]
(try
(schedule! scheduler ms f)
(catch #?(:clj Throwable :cljs :default) cause
(rejected cause)))))

;; --- Pool & Thread Factories

#?(:clj
Expand Down Expand Up @@ -421,7 +496,7 @@
(deftype Scheduler []
pt/IScheduler
(-schedule! [_ ms f]
(let [df (impl/deferred)
(let [df (pimpl/deferred)
tid (js/setTimeout
(fn []
(try
Expand All @@ -431,7 +506,7 @@
ms)]
(pt/-fnly df
(fn [_ c]
(when (impl/isCancellationError c)
(when (pimpl/isCancellationError c)
(js/clearTimeout tid))))
df))))

Expand Down Expand Up @@ -488,7 +563,7 @@
(reify
pt/IExecutor
(-exec! [this f]
(impl/nextTick f))
(pimpl/nextTick f))

(-run! [this f]
(-> (pt/-promise nil)
Expand Down Expand Up @@ -594,7 +669,7 @@
service. The returned promise is not cancellable (the body will be
executed independently of the cancellation)."
[executor & body]
`(-> (submit! ~executor (^:once fn* [] ~@body))
`(-> (submit ~executor (^:once fn* [] ~@body))
(pt/-mcat pt/-promise)))

(defmacro with-dispatch!
Expand All @@ -605,7 +680,7 @@
[executor & body]
(when (:ns &env)
(throw (ex-info "cljs not supported on with-dispatch! macro" {})))
`(-> (submit! ~executor (^:once fn* [] ~@body))
`(-> (submit ~executor (^:once fn* [] ~@body))
(pt/-mcat pt/-promise)
(pt/-await!)))

Expand Down Expand Up @@ -931,6 +1006,8 @@
(pt/-await! resource duration))))

(defn close!
{:no-doc true
:deprecated true}
([o]
(pt/-close! o))
([o reason]
Expand Down
19 changes: 19 additions & 0 deletions src/promesa/util.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,25 @@
[& exprs]
`(try* (^:once fn* [] ~@exprs) identity))

(defn close!
([o]
(pt/-close! o))
([o reason]
(pt/-close! o reason)))

(extend-protocol pt/ICloseable
java.util.concurrent.ExecutorService
(-closed? [it]
(.isShutdown it))
(-close! [it]
(.close it))

java.lang.AutoCloseable
(-closed? [_]
(throw (IllegalArgumentException. "not implemented")))
(-close! [it]
(.close ^java.lang.AutoCloseable it)))

(defmacro with-open
[bindings & body]
{:pre [(vector? bindings)
Expand Down
2 changes: 1 addition & 1 deletion test/promesa/tests/exec_bulkhead_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

;; (t/deftest basic-operations-submit
;; (let [instance (pbh/create {:permits 1 :type :executor})]
;; (let [res (px/submit! instance (timing-fn))]
;; (let [res (px/submit instance (timing-fn))]
;; (t/is (p/promise? res))
;; (t/is (< @res 10000000)))))

Expand Down
5 changes: 2 additions & 3 deletions test/promesa/tests/exec_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
[promesa.core :as p]
[promesa.exec.bulkhead :as pbh]
[promesa.exec :as px]
[promesa.util :as pu]
[clojure.test :as t]))

(t/use-fixtures
Expand Down Expand Up @@ -47,9 +48,7 @@

(t/deftest with-executor-closes-pool-2
(let [executor (px/single-executor)]
(px/with-executor ^:interrupt executor
(px/with-dispatch executor
(Thread/sleep 1000)))
(pu/close! executor)
(t/is (thrown? java.util.concurrent.RejectedExecutionException
(px/submit! executor (constantly nil))))))

Expand Down

0 comments on commit 8155a71

Please sign in to comment.