Skip to content

Commit 9a66ceb

Browse files
committed
Support cancellation during client connection establishment
With #714 we added support for cancelling in-flight HTTP requests by putting the response deferred into an error state. However, this only worked once the underlying TCP connection was established. With this patch, it is now possible to cancel requests even while the connection is still being established (possible since Netty 4.1.108.Final via netty/netty#13849). This also works for `aleph.tcp/client`.
1 parent 659f245 commit 9a66ceb

File tree

7 files changed

+283
-162
lines changed

7 files changed

+283
-162
lines changed

src/aleph/http.clj

+20-14
Original file line numberDiff line numberDiff line change
@@ -100,20 +100,23 @@
100100
will be errors, and a new connection must be created."
101101
[^URI uri options middleware on-closed]
102102
(let [scheme (.getScheme uri)
103-
ssl? (= "https" scheme)]
104-
(-> (client/http-connection
105-
(InetSocketAddress/createUnresolved
106-
(.getHost uri)
107-
(int
108-
(or
109-
(when (pos? (.getPort uri)) (.getPort uri))
110-
(if ssl? 443 80))))
111-
ssl?
112-
(if on-closed
113-
(assoc options :on-closed on-closed)
114-
options))
115-
116-
(d/chain' middleware))))
103+
ssl? (= "https" scheme)
104+
conn (client/http-connection
105+
(InetSocketAddress/createUnresolved
106+
(.getHost uri)
107+
(int
108+
(or
109+
(when (pos? (.getPort uri)) (.getPort uri))
110+
(if ssl? 443 80))))
111+
ssl?
112+
(if on-closed
113+
(assoc options :on-closed on-closed)
114+
options))]
115+
(doto (d/chain' conn middleware)
116+
(d/catch' (fn [e]
117+
(log/trace e "Terminating creation of HTTP connection")
118+
(d/error! conn e)
119+
(d/error-deferred e))))))
117120

118121
(def ^:private connection-stats-callbacks (atom #{}))
119122

@@ -389,6 +392,9 @@
389392
;; function.
390393
(reset! dispose-conn! (fn [] (flow/dispose pool k conn)))
391394

395+
;; allow cancellation during connection establishment
396+
(d/connect result (first conn))
397+
392398
(if (realized? result)
393399
;; to account for race condition between setting `dispose-conn!`
394400
;; and putting `result` into error state for cancellation

src/aleph/http/client.clj

+120-106
Original file line numberDiff line numberDiff line change
@@ -821,112 +821,126 @@
821821
:local-address local-address
822822
:transport (netty/determine-transport transport epoll?)
823823
:name-resolver name-resolver
824-
:connect-timeout connect-timeout})]
825-
826-
(attach-on-close-handler ch-d on-closed)
827-
828-
(d/chain' ch-d
829-
(fn setup-client
830-
[^Channel ch]
831-
(log/debug "Channel:" ch)
832-
833-
;; We know the SSL handshake must be complete because create-client wraps the
834-
;; future with maybe-ssl-handshake-future, so we can get the negotiated
835-
;; protocol, falling back to HTTP/1.1 by default.
836-
(let [pipeline (.pipeline ch)
837-
protocol (cond
838-
ssl?
839-
(or (-> pipeline
840-
^SslHandler (.get ^Class SslHandler)
841-
(.applicationProtocol))
842-
ApplicationProtocolNames/HTTP_1_1) ; Not using ALPN, HTTP/2 isn't allowed
843-
844-
force-h2c?
845-
(do
846-
(log/info "Forcing HTTP/2 over cleartext. Be sure to do this only with servers you control.")
847-
ApplicationProtocolNames/HTTP_2)
848-
849-
:else
850-
ApplicationProtocolNames/HTTP_1_1) ; Not using SSL, HTTP/2 isn't allowed unless h2c requested
851-
setup-opts (assoc opts
852-
:authority authority
853-
:ch ch
854-
:server? false
855-
:keep-alive? keep-alive?
856-
:keep-alive?' keep-alive?'
857-
:logger logger
858-
:non-tun-proxy? non-tun-proxy?
859-
:pipeline pipeline
860-
:pipeline-transform pipeline-transform
861-
:raw-stream? raw-stream?
862-
:remote-address remote-address
863-
:response-buffer-size response-buffer-size
864-
:ssl-context ssl-context
865-
:ssl? ssl?)]
866-
867-
(log/debug (str "Using HTTP protocol: " protocol)
868-
{:authority authority
869-
:ssl? ssl?
870-
:force-h2c? force-h2c?})
871-
872-
;; can't use ApnHandler, because we need to coordinate with Manifold code
873-
(let [http-req-handler
874-
(cond (.equals ApplicationProtocolNames/HTTP_1_1 protocol)
875-
(setup-http1-client setup-opts)
876-
877-
(.equals ApplicationProtocolNames/HTTP_2 protocol)
878-
(do
879-
(http2/setup-conn-pipeline setup-opts)
880-
(http2-req-handler setup-opts))
881-
882-
:else
883-
(do
884-
(let [msg (str "Unknown protocol: " protocol)
885-
e (IllegalStateException. msg)]
886-
(log/error e msg)
887-
(netty/close ch)
888-
(throw e))))]
889-
890-
;; Both Netty and Aleph are set up, unpause the pipeline
891-
(when (.get pipeline "pause-handler")
892-
(log/debug "Unpausing pipeline")
893-
(.remove pipeline "pause-handler"))
894-
895-
(fn http-req-fn
896-
[req]
897-
(log/trace "http-req-fn fired")
898-
(log/debug "client request:" (pr-str req))
899-
900-
;; If :aleph/close is set in the req, closes the channel and
901-
;; returns a deferred containing the result.
902-
(if (or (contains? req :aleph/close)
903-
(contains? req ::close))
904-
(-> ch (netty/close) (netty/wrap-future))
905-
906-
(let [t0 (System/nanoTime)
907-
;; I suspect the below is an error for http1
908-
;; since the shared handler might not match.
909-
;; Should work for HTTP2, though
910-
raw-stream? (get req :raw-stream? raw-stream?)]
911-
912-
(if (or (not (.isActive ch))
913-
(not (.isOpen ch)))
914-
915-
(d/error-deferred
916-
(ex-info "Channel is inactive/closed."
917-
{:req req
918-
:ch ch
919-
:open? (.isOpen ch)
920-
:active? (.isActive ch)}))
921-
922-
(-> (http-req-handler req)
923-
(d/chain' (rsp-handler
924-
{:ch ch
925-
:keep-alive? keep-alive? ; why not keep-alive?'
926-
:raw-stream? raw-stream?
927-
:req req
928-
:response-buffer-size response-buffer-size
929-
:t0 t0})))))))))))))
824+
:connect-timeout connect-timeout})
825+
826+
_ (attach-on-close-handler ch-d on-closed)
827+
828+
close-ch! (atom (fn []))
829+
result (d/deferred)
830+
831+
conn (d/chain' ch-d
832+
(fn setup-client
833+
[^Channel ch]
834+
(log/debug "Channel:" ch)
835+
(reset! close-ch! (fn [] @(-> (netty/close ch) (netty/wrap-future))))
836+
(if (realized? result)
837+
;; Account for race condition between setting `close-ch!` and putting
838+
;; `result` into error state for cancellation
839+
(@close-ch!)
840+
;; We know the SSL handshake must be complete because create-client wraps the
841+
;; future with maybe-ssl-handshake-future, so we can get the negotiated
842+
;; protocol, falling back to HTTP/1.1 by default.
843+
(let [pipeline (.pipeline ch)
844+
protocol (cond
845+
ssl?
846+
(or (-> pipeline
847+
^SslHandler (.get ^Class SslHandler)
848+
(.applicationProtocol))
849+
ApplicationProtocolNames/HTTP_1_1) ; Not using ALPN, HTTP/2 isn't allowed
850+
851+
force-h2c?
852+
(do
853+
(log/info "Forcing HTTP/2 over cleartext. Be sure to do this only with servers you control.")
854+
ApplicationProtocolNames/HTTP_2)
855+
856+
:else
857+
ApplicationProtocolNames/HTTP_1_1) ; Not using SSL, HTTP/2 isn't allowed unless h2c requested
858+
setup-opts (assoc opts
859+
:authority authority
860+
:ch ch
861+
:server? false
862+
:keep-alive? keep-alive?
863+
:keep-alive?' keep-alive?'
864+
:logger logger
865+
:non-tun-proxy? non-tun-proxy?
866+
:pipeline pipeline
867+
:pipeline-transform pipeline-transform
868+
:raw-stream? raw-stream?
869+
:remote-address remote-address
870+
:response-buffer-size response-buffer-size
871+
:ssl-context ssl-context
872+
:ssl? ssl?)]
873+
874+
(log/debug (str "Using HTTP protocol: " protocol)
875+
{:authority authority
876+
:ssl? ssl?
877+
:force-h2c? force-h2c?})
878+
879+
;; can't use ApnHandler, because we need to coordinate with Manifold code
880+
(let [http-req-handler
881+
(cond (.equals ApplicationProtocolNames/HTTP_1_1 protocol)
882+
(setup-http1-client setup-opts)
883+
884+
(.equals ApplicationProtocolNames/HTTP_2 protocol)
885+
(do
886+
(http2/setup-conn-pipeline setup-opts)
887+
(http2-req-handler setup-opts))
888+
889+
:else
890+
(do
891+
(let [msg (str "Unknown protocol: " protocol)
892+
e (IllegalStateException. msg)]
893+
(log/error e msg)
894+
(netty/close ch)
895+
(throw e))))]
896+
897+
;; Both Netty and Aleph are set up, unpause the pipeline
898+
(when (.get pipeline "pause-handler")
899+
(log/debug "Unpausing pipeline")
900+
(.remove pipeline "pause-handler"))
901+
902+
(fn http-req-fn
903+
[req]
904+
(log/trace "http-req-fn fired")
905+
(log/debug "client request:" (pr-str req))
906+
907+
;; If :aleph/close is set in the req, closes the channel and
908+
;; returns a deferred containing the result.
909+
(if (or (contains? req :aleph/close)
910+
(contains? req ::close))
911+
(-> ch (netty/close) (netty/wrap-future))
912+
913+
(let [t0 (System/nanoTime)
914+
;; I suspect the below is an error for http1
915+
;; since the shared handler might not match.
916+
;; Should work for HTTP2, though
917+
raw-stream? (get req :raw-stream? raw-stream?)]
918+
919+
(if (or (not (.isActive ch))
920+
(not (.isOpen ch)))
921+
922+
(d/error-deferred
923+
(ex-info "Channel is inactive/closed."
924+
{:req req
925+
:ch ch
926+
:open? (.isOpen ch)
927+
:active? (.isActive ch)}))
928+
929+
(-> (http-req-handler req)
930+
(d/chain' (rsp-handler
931+
{:ch ch
932+
:keep-alive? keep-alive? ; why not keep-alive?'
933+
:raw-stream? raw-stream?
934+
:req req
935+
:response-buffer-size response-buffer-size
936+
:t0 t0}))))))))))))]
937+
(d/connect conn result)
938+
(d/catch' result (fn [e]
939+
(log/trace e "Closing HTTP connection channel")
940+
(d/error! ch-d e)
941+
(@close-ch!)
942+
(d/error-deferred e)))
943+
result))
930944

931945

932946

src/aleph/netty.clj

+44-29
Original file line numberDiff line numberDiff line change
@@ -1521,6 +1521,14 @@
15211521
(ssl-handler ch ssl-ctx))))
15221522
(pipeline-builder p))))
15231523

1524+
(defn- connect-client
1525+
^ChannelFuture [^Bootstrap bootstrap
1526+
^SocketAddress remote-address
1527+
^SocketAddress local-address]
1528+
(if local-address
1529+
(.connect bootstrap remote-address local-address)
1530+
(.connect bootstrap remote-address)))
1531+
15241532
(defn ^:no-doc create-client-chan
15251533
"Returns a deferred containing a new Channel.
15261534
@@ -1529,8 +1537,8 @@
15291537
complete."
15301538
[{:keys [pipeline-builder
15311539
bootstrap-transform
1532-
^SocketAddress remote-address
1533-
^SocketAddress local-address
1540+
remote-address
1541+
local-address
15341542
transport
15351543
name-resolver
15361544
connect-timeout]
@@ -1543,32 +1551,39 @@
15431551
(throw (IllegalArgumentException. "Can't use :ssl-context anymore.")))
15441552

15451553
(let [^Class chan-class (transport-channel-class transport)
1546-
initializer (pipeline-initializer pipeline-builder)]
1547-
(try
1548-
(let [client-event-loop-group @(transport-client-group transport)
1549-
resolver' (when (some? name-resolver)
1550-
(cond
1551-
(= :default name-resolver) nil
1552-
(= :noop name-resolver) NoopAddressResolverGroup/INSTANCE
1553-
(instance? AddressResolverGroup name-resolver) name-resolver))
1554-
bootstrap (doto (Bootstrap.)
1555-
(.option ChannelOption/SO_REUSEADDR true)
1556-
(.option ChannelOption/CONNECT_TIMEOUT_MILLIS (int connect-timeout))
1557-
#_(.option ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE) ; option deprecated, removed in v5
1558-
(.group client-event-loop-group)
1559-
(.channel chan-class)
1560-
(.handler initializer)
1561-
(.resolver resolver')
1562-
bootstrap-transform)
1563-
1564-
fut (if local-address
1565-
(.connect bootstrap remote-address local-address)
1566-
(.connect bootstrap remote-address))]
1567-
1568-
(d/chain' (wrap-future fut)
1569-
(fn [_]
1570-
(let [ch (.channel ^ChannelFuture fut)]
1571-
(maybe-ssl-handshake-future ch))))))))
1554+
initializer (pipeline-initializer pipeline-builder)
1555+
client-event-loop-group @(transport-client-group transport)
1556+
resolver' (when (some? name-resolver)
1557+
(cond
1558+
(= :default name-resolver) nil
1559+
(= :noop name-resolver) NoopAddressResolverGroup/INSTANCE
1560+
(instance? AddressResolverGroup name-resolver) name-resolver))
1561+
bootstrap (doto (Bootstrap.)
1562+
(.option ChannelOption/SO_REUSEADDR true)
1563+
(.option ChannelOption/CONNECT_TIMEOUT_MILLIS (int connect-timeout))
1564+
#_(.option ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE) ; option deprecated, removed in v5
1565+
(.group client-event-loop-group)
1566+
(.channel chan-class)
1567+
(.handler initializer)
1568+
(.resolver resolver')
1569+
bootstrap-transform)
1570+
1571+
fut (connect-client bootstrap remote-address local-address)]
1572+
(doto (-> (wrap-future fut)
1573+
(d/chain'
1574+
(fn [_]
1575+
(let [ch (.channel ^ChannelFuture fut)]
1576+
(maybe-ssl-handshake-future ch)))))
1577+
(d/catch' (fn [e]
1578+
(when-not (.isDone fut)
1579+
(log/trace e "Cancelling Bootstrap#connect future")
1580+
(when-not (.cancel fut true)
1581+
(when-not (.isDone fut)
1582+
(log/warn "Transport" transport "does not support cancellation of connection attempts."
1583+
"Instead, you have to wait for the connect timeout to expire for it to be terminated."
1584+
"Its current value is" connect-timeout "ms."
1585+
"It can be set via the `connect-timeout` option."))))
1586+
(d/error-deferred e))))))
15721587

15731588

15741589
(defn ^:no-doc ^:deprecated create-client
@@ -1732,7 +1747,7 @@
17321747
(fn [shutdown-output]
17331748
(when (= shutdown-output ::timeout)
17341749
(log/error
1735-
(format "Timeout while waiting for requests to close (exceeded: %ss)"
1750+
(format "Timeout while waiting for connections to close (exceeded: %ss)"
17361751
shutdown-timeout)))))
17371752
(d/finally'
17381753
;; 3. At this stage, stop the EventLoopGroup, this will cancel any

0 commit comments

Comments
 (0)