diff --git a/src/aleph/http.clj b/src/aleph/http.clj index 08efc6a4..cf71d9b8 100644 --- a/src/aleph/http.clj +++ b/src/aleph/http.clj @@ -51,6 +51,7 @@ | --- | --- | | `port` | The port the server will bind to. If `0`, the server will bind to a random port. | | `socket-address` | A `java.net.SocketAddress` specifying both the port and interface to bind to. | + | `existing-channel` | A pre-bound `java.nio.channels.ServerSocketChannel` for the server to use rather than opening and binding its own server-socket. It won't be closed by the server. Possibly obtained from `System/inheritedChannel`. | | `bootstrap-transform` | A function that takes an `io.netty.bootstrap.ServerBootstrap` object, which represents the server, and modifies it. | | `http-versions` | An optional vector of allowable HTTP versions to negotiate via ALPN, in preference order. Defaults to `[:http1]`. | | `ssl-context` | An `io.netty.handler.ssl.SslContext` object or a map of SSL context options (see `aleph.netty/ssl-server-context` for more details) if an SSL connection is desired. When passing an `io.netty.handler.ssl.SslContext` object, it must have an ALPN config matching the `http-versions` option (see `aleph.netty/ssl-server-context` and `aleph.netty/application-protocol-config`). If only HTTP/1.1 is desired, ALPN config is optional. diff --git a/src/aleph/http/server.clj b/src/aleph/http/server.clj index 1a9677d1..6d59ddc6 100644 --- a/src/aleph/http/server.clj +++ b/src/aleph/http/server.clj @@ -728,6 +728,7 @@ [handler {:keys [port socket-address + existing-channel executor http-versions ssl-context @@ -773,9 +774,10 @@ (netty/start-server {:pipeline-builder pipeline-builder :bootstrap-transform bootstrap-transform - :socket-address (if socket-address - socket-address - (InetSocketAddress. port)) + :socket-address (cond + socket-address socket-address + (nil? existing-channel) (InetSocketAddress. port)) + :existing-channel existing-channel :on-close (when (and shutdown-executor? (or (instance? ExecutorService executor) (instance? ExecutorService continue-executor))) diff --git a/src/aleph/netty.clj b/src/aleph/netty.clj index 49336df4..08d12b54 100644 --- a/src/aleph/netty.clj +++ b/src/aleph/netty.clj @@ -21,6 +21,7 @@ Unpooled) (io.netty.channel Channel + ChannelFactory ChannelFuture ChannelHandler ChannelHandlerContext @@ -1667,6 +1668,22 @@ (.addFirst pipeline "channel-tracker" ^ChannelHandler (channel-tracking-handler chan-group)) (pipeline-builder pipeline))) +(defn- validate-existing-channel + [existing-channel] + (when (some? existing-channel) + (when-not (instance? java.nio.channels.ServerSocketChannel existing-channel) + (throw (IllegalArgumentException. + (str "The existing-channel type is not supported: " (pr-str existing-channel))))) + (when (nil? (.getLocalAddress ^java.nio.channels.ServerSocketChannel existing-channel)) + (throw (IllegalArgumentException. + (str "The existing-channel is not bound: " (pr-str existing-channel))))))) + +(defn- wrapping-channel-factory + ^ChannelFactory [^java.nio.channels.ServerSocketChannel channel] + (proxy [ChannelFactory] [] + (newChannel [] + (NioServerSocketChannel. channel)))) + (defn ^:no-doc start-server ([pipeline-builder ssl-context @@ -1685,11 +1702,13 @@ bootstrap-transform on-close ^SocketAddress socket-address + existing-channel transport shutdown-timeout] :or {shutdown-timeout default-shutdown-timeout} :as opts}] (ensure-transport-available! transport) + (validate-existing-channel existing-channel) (let [num-cores (.availableProcessors (Runtime/getRuntime)) num-threads (* 2 num-cores) thread-factory (enumerating-thread-factory "aleph-server-pool" false) @@ -1715,7 +1734,8 @@ (.option ChannelOption/SO_REUSEADDR true) (.option ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE) (.group group) - (.channel channel-class) + (cond-> (nil? existing-channel) (.channel channel-class)) + (cond-> (some? existing-channel) (.channelFactory (wrapping-channel-factory existing-channel))) ;;TODO: add a server (.handler) call to the bootstrap, for logging or something (.childHandler (pipeline-initializer pipeline-builder)) (.childOption ChannelOption/SO_REUSEADDR true) @@ -1723,7 +1743,11 @@ bootstrap-transform) ^ServerSocketChannel - ch (-> b (.bind socket-address) .sync .channel)] + ch (-> (if (nil? existing-channel) + (.bind b socket-address) + (.register b)) + .sync + .channel)] (reify Closeable @@ -1731,7 +1755,9 @@ (when (compare-and-set! closed? false true) ;; This is the three step closing sequence: ;; 1. Stop listening to incoming requests - (-> ch .close .sync) + (if (nil? existing-channel) + (-> ch .close .sync) + (-> ch .deregister .sync)) (-> (if (pos? shutdown-timeout) ;; 2. Wait for in-flight requests to stop processing within the supplied timeout ;; interval. @@ -1759,7 +1785,8 @@ (port [_] (-> ch .localAddress .getPort)) (wait-for-close [_] - (-> ch .closeFuture .await) + (when (nil? existing-channel) + (-> ch .closeFuture .await)) (-> group .terminationFuture .await) nil))) diff --git a/src/aleph/tcp.clj b/src/aleph/tcp.clj index c7d6b99f..5c064378 100644 --- a/src/aleph/tcp.clj +++ b/src/aleph/tcp.clj @@ -74,6 +74,7 @@ | --- | --- | `port` | the port the server will bind to. If `0`, the server will bind to a random port. | `socket-address` | a `java.net.SocketAddress` specifying both the port and interface to bind to. + | `existing-channel` | a pre-bound `java.nio.channels.ServerSocketChannel` for the server to use rather than opening and binding its own server-socket. It won't be closed by the server. Possibly obtained from `System/inheritedChannel`. | | `ssl-context` | an `io.netty.handler.ssl.SslContext` object or a map of SSL context options (see `aleph.netty/ssl-server-context` for more details). If given, the server will only accept SSL connections and call the handler once the SSL session has been successfully established. If a self-signed certificate is all that's required, `(aleph.netty/self-signed-ssl-context)` will suffice. | `bootstrap-transform` | a function that takes an `io.netty.bootstrap.ServerBootstrap` object, which represents the server, and modifies it. | `pipeline-transform` | a function that takes an `io.netty.channel.ChannelPipeline` object, which represents a connection, and modifies it. @@ -81,7 +82,7 @@ | `shutdown-timeout` | interval in seconds within which in-flight requests must be processed, defaults to 15 seconds. A value of 0 bypasses waiting entirely. | `transport` | the transport to use, one of `:nio`, `:epoll`, `:kqueue` or `:io-uring` (defaults to `:nio`)." [handler - {:keys [port socket-address ssl-context bootstrap-transform pipeline-transform epoll? + {:keys [port socket-address existing-channel ssl-context bootstrap-transform pipeline-transform epoll? shutdown-timeout transport] :or {bootstrap-transform identity pipeline-transform identity @@ -101,9 +102,10 @@ (server-channel-handler handler options)) (pipeline-transform pipeline)) :bootstrap-transform bootstrap-transform - :socket-address (if socket-address - socket-address - (InetSocketAddress. port)) + :socket-address (cond + socket-address socket-address + (nil? existing-channel) (InetSocketAddress. port)) + :existing-channel existing-channel :transport (netty/determine-transport transport epoll?) :shutdown-timeout shutdown-timeout}))) diff --git a/test/aleph/http_test.clj b/test/aleph/http_test.clj index f04b6d2a..a6e0f873 100644 --- a/test/aleph/http_test.clj +++ b/test/aleph/http_test.clj @@ -7,7 +7,7 @@ [aleph.resource-leak-detector] [aleph.ssl :as test-ssl] [aleph.tcp :as tcp] - [aleph.testutils :refer [str=]] + [aleph.testutils :refer [str= bound-channel]] [clj-commons.byte-streams :as bs] [clojure.java.io :as io] [clojure.string :as str] @@ -47,7 +47,10 @@ (java.io Closeable File) - (java.net UnknownHostException) + (java.net + BindException + UnknownHostException) + (java.nio.channels ServerSocketChannel) (java.util.concurrent SynchronousQueue ThreadPoolExecutor @@ -1747,6 +1750,40 @@ (catch Exception e (is (instance? ProxyConnectException e))))))))) +(deftest test-existing-channel + (testing "validation" + (is (thrown-with-msg? Exception #"existing-channel" + (with-http-servers basic-handler {:existing-channel "a string"}))) + (with-open [unbound-channel (ServerSocketChannel/open)] + (is (thrown-with-msg? Exception #"existing-channel" + (with-http-servers basic-handler {:existing-channel unbound-channel}))))) + + (testing "with a bound server-socket channel" + (testing "- unknown to the server" + (with-open [_ (bound-channel port)] + ;; The port is already bound by a server-socket channel, but + ;; we are not telling Aleph about it, so we should get a + ;; BindException when Aleph tries to bind to the same port. + (is (thrown? BindException + (with-http-servers basic-handler {}))))) + + (testing "- known to the server" + (with-open [channel (bound-channel port)] + ;; This time, we shouldn't get a BindException, because we are + ;; telling Aleph to use an existing server-socket channel, + ;; which should be already bound, so Aleph doesn't try to bind. + (with-http-servers basic-handler {:existing-channel channel} + (is (= 200 (:status @(http-get "/string"))))) + ;; The existing channel should not be closed on a server shutdown, + ;; because that channel is not owned by the server. + (is (.isOpen channel))))) + + (testing "the :port option is not required when :existing-channel is passed" + (with-redefs [http-server-options (dissoc http-server-options :port)] + (with-open [channel (bound-channel port)] + (with-http1-server basic-handler {:existing-channel channel} + (is (= 200 (:status @(http-get "/string"))))))))) + (deftest ^:leak test-leak-in-raw-stream-handler ;; NOTE: Expecting 2 leaks because `with-raw-handler` will run its body for both http1 and ;; http2. It would be nicer to put this assertion into the body but the http1 server seems to diff --git a/test/aleph/tcp_test.clj b/test/aleph/tcp_test.clj index 11e26881..a871d16e 100644 --- a/test/aleph/tcp_test.clj +++ b/test/aleph/tcp_test.clj @@ -3,6 +3,7 @@ [aleph.netty :as netty] [aleph.resource-leak-detector] [aleph.tcp :as tcp] + [aleph.testutils :refer [bound-channel]] [clj-commons.byte-streams :as bs] [clojure.test :refer [deftest testing is]] [manifold.stream :as s])) @@ -55,4 +56,13 @@ (catch Exception _ (is (not (netty/io-uring-available?))))))) +(deftest test-existing-channel + (testing "the :port option is not required when :existing-channel is passed" + (let [port 8083] + (with-open [channel (bound-channel port)] + (with-server (tcp/start-server echo-handler {:existing-channel channel :shutdown-timeout 0}) + (let [c @(tcp/client {:host "localhost" :port port})] + (s/put! c "foo") + (is (= "foo" (bs/to-string @(s/take! c)))))))))) + (aleph.resource-leak-detector/instrument-tests!) diff --git a/test/aleph/testutils.clj b/test/aleph/testutils.clj index f225c2c0..06e8b695 100644 --- a/test/aleph/testutils.clj +++ b/test/aleph/testutils.clj @@ -1,8 +1,15 @@ (ns aleph.testutils - (:import (io.netty.util AsciiString))) + (:import (io.netty.util AsciiString) + (java.net InetSocketAddress) + (java.nio.channels ServerSocketChannel))) (defn str= "AsciiString-aware equals" [^CharSequence x ^CharSequence y] (AsciiString/contentEquals x y)) +(defn bound-channel + "Returns a new server-socket channel bound to a `port`." + ^ServerSocketChannel [port] + (doto (ServerSocketChannel/open) + (.bind (InetSocketAddress. port))))