Skip to content

Commit c684f65

Browse files
committed
HTTP client request cancellation
This patch changes `aleph.http/request` so that setting the response deferred to an error status will terminate an in-flight request. This allows e.g. for `d/timeout!` to be used without potentially leaking connections. For convenient explicit cancellation, we provide `aleph.http/cancel-request!`. It sets the given response deferred to error with an instance of the new `aleph.utils.RequestCancellationException`. Closes #712.
1 parent 3304d64 commit c684f65

File tree

3 files changed

+154
-93
lines changed

3 files changed

+154
-93
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package aleph.utils;
2+
3+
import java.util.concurrent.CancellationException;
4+
5+
public class RequestCancellationException extends CancellationException {
6+
7+
public RequestCancellationException() { }
8+
9+
public RequestCancellationException(String message) {
10+
super(message);
11+
}
12+
13+
public RequestCancellationException(Throwable cause) {
14+
super(cause.getMessage());
15+
initCause(cause);
16+
}
17+
18+
public RequestCancellationException(String message, Throwable cause) {
19+
super(message);
20+
initCause(cause);
21+
}
22+
23+
}

src/aleph/http.clj

+109-90
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
ConnectionTimeoutException
2121
PoolTimeoutException
2222
ReadTimeoutException
23+
RequestCancellationException
2324
RequestTimeoutException)
2425
(io.aleph.dirigiste Pools)
2526
(io.netty.handler.codec Headers)
@@ -336,6 +337,9 @@
336337
by [clj-http](https://github.com/dakrone/clj-http), and returns a deferred representing
337338
the HTTP response. Also allows for a custom `pool` or `middleware` to be defined.
338339
340+
Putting the returned deferred into an error state will cancel the underlying request if it is
341+
still in flight.
342+
339343
Param key | Description
340344
-------------------- | -----------------------------------------------------------------------------------------------------------------------------------------------------------------
341345
`connection-timeout` | timeout in milliseconds for the connection to become established
@@ -358,96 +362,111 @@
358362
middleware identity
359363
connection-timeout 6e4} ;; 60 seconds
360364
:as req}]
361-
362-
(executor/with-executor response-executor
363-
((middleware
364-
(fn [req]
365-
(let [k (client/req->domain req)
366-
start (System/currentTimeMillis)]
367-
368-
;; acquire a connection
369-
(-> (flow/acquire pool k)
370-
(maybe-timeout! pool-timeout)
371-
372-
;; pool timeout triggered
373-
(d/catch' TimeoutException
374-
(fn [^Throwable e]
375-
(d/error-deferred (PoolTimeoutException. e))))
376-
377-
(d/chain'
378-
(fn [conn]
379-
380-
;; get the wrapper for the connection, which may or may not be realized yet
381-
(-> (first conn)
382-
(maybe-timeout! connection-timeout)
383-
384-
;; connection timeout triggered, dispose of the connetion
385-
(d/catch' TimeoutException
386-
(fn [^Throwable e]
387-
(log/error e "Timed out waiting for connection to be established")
388-
(flow/dispose pool k conn)
389-
(d/error-deferred (ConnectionTimeoutException. e))))
390-
391-
;; connection failed, bail out
392-
(d/catch'
393-
(fn [e]
394-
(log/error e "Connection failure")
395-
(flow/dispose pool k conn)
396-
(d/error-deferred e)))
397-
398-
;; actually make the request now
399-
(d/chain'
400-
(fn [conn']
401-
(when-not (nil? conn')
402-
(let [end (System/currentTimeMillis)]
403-
(-> (conn' req)
404-
(maybe-timeout! request-timeout)
405-
406-
;; request timeout triggered, dispose of the connection
407-
(d/catch' TimeoutException
408-
(fn [^Throwable e]
409-
(flow/dispose pool k conn)
410-
(d/error-deferred (RequestTimeoutException. e))))
411-
412-
;; request failed, dispose of the connection
413-
(d/catch'
414-
(fn [e]
415-
(log/trace "Request failed. Disposing of connection...")
416-
(flow/dispose pool k conn)
417-
(d/error-deferred e)))
418-
419-
;; clean up the connection
420-
(d/chain'
421-
(fn cleanup-conn [rsp]
422-
423-
;; either destroy/dispose of the conn, or release it back for reuse
424-
(-> (:aleph/destroy-conn? rsp)
425-
(maybe-timeout! read-timeout)
426-
427-
(d/catch' TimeoutException
428-
(fn [^Throwable e]
429-
(log/trace "Request timed out. Disposing of connection...")
430-
(flow/dispose pool k conn)
431-
(d/error-deferred (ReadTimeoutException. e))))
432-
433-
(d/chain'
434-
(fn [early?]
435-
(if (or early?
436-
(not (:aleph/keep-alive? rsp))
437-
(<= 400 (:status rsp)))
438-
(do
439-
(log/trace "Connection finished. Disposing...")
440-
(flow/dispose pool k conn))
441-
(flow/release pool k conn)))))
442-
(-> rsp
443-
(dissoc :aleph/destroy-conn?)
444-
(assoc :connection-time (- end start)))))))))
445-
446-
(fn handle-response [rsp]
447-
(->> rsp
448-
(middleware/handle-cookies req)
449-
(middleware/handle-redirects request req)))))))))))
450-
req))))
365+
(let [dispose-conn! (atom (fn []))
366+
result (d/deferred response-executor)
367+
response (executor/with-executor response-executor
368+
((middleware
369+
(fn [req]
370+
(let [k (client/req->domain req)
371+
start (System/currentTimeMillis)]
372+
373+
;; acquire a connection
374+
(-> (flow/acquire pool k)
375+
(maybe-timeout! pool-timeout)
376+
377+
;; pool timeout triggered
378+
(d/catch' TimeoutException
379+
(fn [^Throwable e]
380+
(d/error-deferred (PoolTimeoutException. e))))
381+
382+
(d/chain'
383+
(fn [conn]
384+
;; NOTE: All error handlers below delegate disposal of the
385+
;; connection to the error handler on `result` which uses this
386+
;; function.
387+
(reset! dispose-conn! (fn [] (flow/dispose pool k conn)))
388+
389+
(if (realized? result)
390+
;; to account for race condition between setting `dispose-conn!`
391+
;; and putting `result` into error state for cancellation
392+
(@dispose-conn!)
393+
;; get the wrapper for the connection, which may or may not be realized yet
394+
(-> (first conn)
395+
(maybe-timeout! connection-timeout)
396+
397+
;; connection timeout triggered
398+
(d/catch' TimeoutException
399+
(fn [^Throwable e]
400+
(log/error e "Timed out waiting for connection to be established")
401+
(d/error-deferred (ConnectionTimeoutException. e))))
402+
403+
;; connection failed, bail out
404+
(d/catch'
405+
(fn [e]
406+
(log/error e "Connection failure")
407+
(d/error-deferred e)))
408+
409+
;; actually make the request now
410+
(d/chain'
411+
(fn [conn']
412+
(when-not (nil? conn')
413+
(let [end (System/currentTimeMillis)]
414+
(-> (conn' req)
415+
(maybe-timeout! request-timeout)
416+
417+
;; request timeout triggered
418+
(d/catch' TimeoutException
419+
(fn [^Throwable e]
420+
(d/error-deferred (RequestTimeoutException. e))))
421+
422+
;; clean up the connection
423+
(d/chain'
424+
(fn cleanup-conn [rsp]
425+
426+
;; either destroy/dispose of the conn, or release it back for reuse
427+
(-> (:aleph/destroy-conn? rsp)
428+
(maybe-timeout! read-timeout)
429+
430+
;; read timeout triggered
431+
(d/catch' TimeoutException
432+
(fn [^Throwable e]
433+
(log/trace "Request timed out.")
434+
(d/error-deferred (ReadTimeoutException. e))))
435+
436+
(d/chain'
437+
(fn [early?]
438+
(if (or early?
439+
(not (:aleph/keep-alive? rsp))
440+
(<= 400 (:status rsp)))
441+
(do
442+
(log/trace "Connection finished. Disposing...")
443+
(flow/dispose pool k conn))
444+
(flow/release pool k conn)))))
445+
(-> rsp
446+
(dissoc :aleph/destroy-conn?)
447+
(assoc :connection-time (- end start)))))))))
448+
449+
(fn handle-response [rsp]
450+
(->> rsp
451+
(middleware/handle-cookies req)
452+
(middleware/handle-redirects request req))))))))))))
453+
req))]
454+
(d/connect response result)
455+
(d/catch' result
456+
(fn [e]
457+
(log/trace e "Request failed. Disposing of connection...")
458+
(@dispose-conn!)
459+
(d/error-deferred e)))
460+
result)))
461+
462+
(defn cancel-request!
463+
"Accepts a response deferred as returned by `request` and cancels the underlying request if it is
464+
still in flight.
465+
466+
This is done by putting the deferred into error state with an
467+
`aleph.utils.RequestCancellationException` instance as its value."
468+
[r]
469+
(d/error! r (RequestCancellationException. "Request cancelled")))
451470

452471
(defn- req
453472
([method url]

test/aleph/http_test.clj

+22-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
(:import
1818
(aleph.utils
1919
ConnectionTimeoutException
20+
RequestCancellationException
2021
RequestTimeoutException)
2122
(clojure.lang
2223
ExceptionInfo)
@@ -1073,11 +1074,14 @@
10731074
(Thread/sleep 5)
10741075
(s/put! s (encode-http-object response))))
10751076

1076-
(defmacro with-tcp-response [response & body]
1077-
`(with-server (tcp/start-server (tcp-handler ~response) {:port port
1078-
:shutdown-timeout 0})
1077+
(defmacro with-tcp-server [handler & body]
1078+
`(with-server (tcp/start-server ~handler {:port port
1079+
:shutdown-timeout 0})
10791080
~@body))
10801081

1082+
(defmacro with-tcp-response [response & body]
1083+
`(with-tcp-server (tcp-handler ~response) ~@body))
1084+
10811085
(defmacro with-tcp-request-handler [handler options request & body]
10821086
`(with-server (http/start-server ~handler (merge http-server-options ~options))
10831087
(let [conn# @(tcp/client {:host "localhost" :port port})
@@ -1438,3 +1442,18 @@
14381442
:http-versions [:http1]})]
14391443
(is (instance? IllegalArgumentException result))
14401444
(is (= "use-h2c? may only be true when HTTP/2 is enabled." (ex-message result))))))
1445+
1446+
(deftest test-in-flight-request-cancellation
1447+
(let [conn-established (promise)
1448+
conn-closed (promise)]
1449+
(with-tcp-server (fn [s _]
1450+
(deliver conn-established true)
1451+
;; Required for the client close to be detected
1452+
(s/consume identity s)
1453+
(s/on-closed s (fn []
1454+
(deliver conn-closed true))))
1455+
(let [rsp (http-get "/")]
1456+
(is (= true (deref conn-established 1000 :timeout)))
1457+
(http/cancel-request! rsp)
1458+
(is (= true (deref conn-closed 1000 :timeout)))
1459+
(is (thrown? RequestCancellationException (deref rsp 1000 :timeout)))))))

0 commit comments

Comments
 (0)