From b39dcf8d331334c0e25b0aeaf77276b2d84e46bb Mon Sep 17 00:00:00 2001 From: "william.mo" Date: Wed, 6 Aug 2025 19:55:35 -0700 Subject: [PATCH 01/11] Create a sse plugin to support SSE(Server-Sent Events) --- apisix/plugins/sse.lua | 104 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 apisix/plugins/sse.lua diff --git a/apisix/plugins/sse.lua b/apisix/plugins/sse.lua new file mode 100644 index 000000000000..a33ec252d8c0 --- /dev/null +++ b/apisix/plugins/sse.lua @@ -0,0 +1,104 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +local core = require("apisix.core") +local plugin = require("apisix.plugin") + +local plugin_name = "sse" + +local schema = { + type = "object", + properties = { + proxy_read_timeout = { + type = "integer", + description = "Sets the timeout for reading a response from the proxied server, in seconds. A value of 0 turns off this timeout.", + default = 3600, -- 1 hour + minimum = 0, + }, + override_content_type = { + type = "boolean", + description = "Whether to force the Content-Type header to 'text/event-stream'.", + default = true, + }, + connection_header = { + type = "string", + enum = { "keep-alive", "close" }, + description = "Value for the 'Connection' response header.", + default = "keep-alive", + }, + cache_control = { + type = "string", + description = "Value for the 'Cache-Control' response header.", + default = "no-cache", + } + }, +} + +local _M = { + version = 0.1, + priority = 1005, -- Runs after authentication but before most other plugins. + name = plugin_name, + schema = schema, + stream_only = false, +} + +function _M.check_schema(conf) + return core.schema.check(schema, conf) +end + +-- The rewrite phase is executed before the request is forwarded to the upstream. +-- This is the correct place to set Nginx variables that control proxy behavior. +function _M.rewrite(conf, ctx) + core.log.debug("sse plugin rewrite phase") + + -- Disable response buffering from the proxied server. + -- This is the key to making SSE work, as it allows data to be sent + -- to the client as soon as it's received from the upstream. + core.ctx.set_var(ctx, "proxy_buffering", "off") + core.log.debug("sse plugin set proxy_buffering to off") + + -- Also disable request buffering. While not strictly required for SSE + -- (which is server-to-client), it's good practice for streaming APIs. + core.ctx.set_var(ctx, "proxy_request_buffering", "off") + core.log.debug("sse plugin set proxy_request_buffering to off") + + -- Set a long read timeout, as SSE connections are long-lived. + -- The default is 60s, which would prematurely close the connection. + local timeout_str = conf.proxy_read_timeout .. "s" + core.ctx.set_var(ctx, "proxy_read_timeout", timeout_str) + core.log.debug("sse plugin set proxy_read_timeout to ", timeout_str) +end + +-- The header_filter phase is executed after the response headers are received +-- from the upstream and before they are sent to the client. +function _M.header_filter(conf, ctx) + core.log.debug("sse plugin header_filter phase") + + core.response.set_header("X-Accel-Buffering", "no") + core.log.debug("sse plugin set X-Accel-Buffering to no") + core.response.set_header("Cache-Control", conf.cache_control) + core.log.debug("sse plugin set Cache-Control to ", conf.cache_control) + core.response.set_header("Connection", conf.connection_header) + core.log.debug("sse plugin set Connection to ", conf.connection_header) + + if conf.override_content_type then + core.response.set_header("Content-Type", "text/event-stream; charset=utf-8") + core.log.debug("sse plugin set Content-Type to text/event-stream; charset=utf-8") + end +end + +return _M \ No newline at end of file From 2b25e0a6573cc5a55917c03d7de04287050c0d7f Mon Sep 17 00:00:00 2001 From: "william.mo" Date: Wed, 6 Aug 2025 20:25:52 -0700 Subject: [PATCH 02/11] add the doc and test file --- docs/en/latest/sse.md | 104 +++++++++++++++++++++++++++ t/plugin/sse.t | 162 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 266 insertions(+) create mode 100644 docs/en/latest/sse.md create mode 100644 t/plugin/sse.t diff --git a/docs/en/latest/sse.md b/docs/en/latest/sse.md new file mode 100644 index 000000000000..a5e2eb974fe9 --- /dev/null +++ b/docs/en/latest/sse.md @@ -0,0 +1,104 @@ +--- +title: sse +--- + +# Summary + +The `sse` plugin enables support for **Server-Sent Events (SSE)** by configuring APISIX to correctly proxy long-lived HTTP connections used in streaming scenarios. + +SSE allows servers to push updates to clients over a single HTTP connection using the `text/event-stream` content type. This plugin ensures buffering is disabled, proper timeouts are set, and necessary response headers are applied. + +# Attributes + +| Name | Type | Default | Required | Description | +| ----------------------- | ------- | ------------ | -------- | ------------------------------------------------------------------------------------------------------------------------------------------------- | +| `proxy_read_timeout` | Integer | `3600` | False | Timeout in seconds for reading a response from the upstream server. A value of `0` disables the timeout. This should be long for SSE connections. | +| `override_content_type` | Boolean | `true` | False | Whether to force the `Content-Type` header to `text/event-stream; charset=utf-8`. | +| `connection_header` | String | `keep-alive` | False | Sets the `Connection` response header. | +| `cache_control` | String | `no-cache` | False | Sets the `Cache-Control` response header. | + +# How It Works + +When enabled, the plugin makes the following adjustments: + +- Disables response and request buffering using NGINX variables. +- Sets a long read timeout (`proxy_read_timeout`) to support streaming. +- Optionally overrides the `Content-Type` to `text/event-stream; charset=utf-8`. +- Sets headers necessary for SSE: + - `X-Accel-Buffering: no` + - `Connection` + - `Cache-Control` + +These settings are required to ensure that the SSE connection remains open and data can be streamed to the client in real time. + +# Example Configuration + +```json +{ + "name": "sse", + "priority": 1005, + "config": { + "proxy_read_timeout": 7200, + "override_content_type": true, + "connection_header": "keep-alive", + "cache_control": "no-cache" + } +} +``` + + +# Enabling the Plugin on a Route + +``` +curl http://127.0.0.1:9180/apisix/admin/routes/1 -X PUT -d ' +{ + "uri": "/sse", + "plugins": { + "sse": { + "proxy_read_timeout": 7200, + "override_content_type": true, + "connection_header": "keep-alive", + "cache_control": "no-cache" + } + }, + "upstream": { + "type": "roundrobin", + "nodes": { + "httpbin.org:80": 1 + } + } +}' +``` + +This example enables the plugin on the /sse route and sets a 2-hour timeout for SSE connections, ensuring the correct headers and proxy behavior are applied. + + +# Notes + +This plugin is only relevant for routes that serve SSE (e.g., real-time feeds, logs, event notifications). + +SSE is a one-way communication protocol (server → client). This plugin does not support bidirectional protocols like WebSocket. + +If your upstream already sets the correct Content-Type, you can disable the override using "override_content_type": false. + +Ensure your upstream service flushes events frequently to keep the SSE connection alive. + +# Disabling the Plugin + +To disable the sse plugin on a route: + +``` +curl http://127.0.0.1:9180/apisix/admin/routes/1 -X PATCH -d ' +{ + "plugins": { + "sse": null + } +}' +``` + +# Changelog + +| Version | Description | +| ------- | ----------------------------------------------- | +| 0.1 | Initial version of the plugin with SSE support. | + diff --git a/t/plugin/sse.t b/t/plugin/sse.t new file mode 100644 index 000000000000..f52361e8f82f --- /dev/null +++ b/t/plugin/sse.t @@ -0,0 +1,162 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +use t::APISIX 'no_plan'; + +run_tests(); + +__DATA__ + +=== TEST 1: sse plugin with default configuration +--- config +location /t { + content_by_lua_block { + local core = require("apisix.core") + local t = require("t.test_funcs") + + local route_conf = { + uri = "/sse", + plugins = { + sse = {} + }, + upstream = { + type = "roundrobin", + nodes = { + [t.upstream_server_host_1] = 1, + } + } + } + t.add_route("/t/route/1", route_conf) + + core.response.exit(200) + } +} +--- server_config +location /sse { + content_by_lua_block { + ngx.header["Content-Type"] = "text/plain" + ngx.say("data: hello from upstream") + } +} +--- request +GET /sse +--- response_headers +Content-Type: text/event-stream; charset=utf-8 +X-Accel-Buffering: no +Cache-Control: no-cache +Connection: keep-alive +--- response_body +data: hello from upstream +--- error_code: 200 +--- no_error_log +[error] + + + +=== TEST 2: sse plugin with override_content_type = false +--- config +location /t { + content_by_lua_block { + local core = require("apisix.core") + local t = require("t.test_funcs") + + local route_conf = { + uri = "/sse_no_override", + plugins = { + sse = { + override_content_type = false + } + }, + upstream = { + type = "roundrobin", + nodes = { + [t.upstream_server_host_1] = 1, + } + } + } + t.add_route("/t/route/2", route_conf) + + core.response.exit(200) + } +} +--- server_config +location /sse_no_override { + content_by_lua_block { + ngx.header["Content-Type"] = "application/json" + ngx.say("{\"message\": \"hello\"}") + } +} +--- request +GET /sse_no_override +--- response_headers +Content-Type: application/json +X-Accel-Buffering: no +Cache-Control: no-cache +Connection: keep-alive +--- response_body +{"message": "hello"} +--- error_code: 200 +--- no_error_log +[error] + + + +=== TEST 3: sse plugin with custom connection and cache-control headers +--- config +location /t { + content_by_lua_block { + local core = require("apisix.core") + local t = require("t.test_funcs") + + local route_conf = { + uri = "/sse_custom", + plugins = { + sse = { + connection_header = "close", + cache_control = "public, max-age=3600" + } + }, + upstream = { + type = "roundrobin", + nodes = { + [t.upstream_server_host_1] = 1, + } + } + } + t.add_route("/t/route/3", route_conf) + + core.response.exit(200) + } +} +--- server_config +location /sse_custom { + content_by_lua_block { + ngx.header["Content-Type"] = "text/plain" + ngx.say("data: hello from upstream") + } +} +--- request +GET /sse_custom +--- response_headers +Content-Type: text/event-stream; charset=utf-8 +X-Accel-Buffering: no +Cache-Control: public, max-age=3600 +Connection: close +--- response_body +data: hello from upstream +--- error_code: 200 +--- no_error_log +[error] \ No newline at end of file From bb65d46f445f4e48991264ebbdc7dbed68a9c443 Mon Sep 17 00:00:00 2001 From: "william.mo" Date: Wed, 6 Aug 2025 20:37:45 -0700 Subject: [PATCH 03/11] update the unit test --- apisix/plugins/sse.lua | 1 - t/plugin/sse.t | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/apisix/plugins/sse.lua b/apisix/plugins/sse.lua index a33ec252d8c0..a19cc70874c3 100644 --- a/apisix/plugins/sse.lua +++ b/apisix/plugins/sse.lua @@ -36,7 +36,6 @@ local schema = { }, connection_header = { type = "string", - enum = { "keep-alive", "close" }, description = "Value for the 'Connection' response header.", default = "keep-alive", }, diff --git a/t/plugin/sse.t b/t/plugin/sse.t index f52361e8f82f..b0038121214c 100644 --- a/t/plugin/sse.t +++ b/t/plugin/sse.t @@ -125,8 +125,8 @@ location /t { uri = "/sse_custom", plugins = { sse = { - connection_header = "close", - cache_control = "public, max-age=3600" + connection_header = "Upgrade", + cache_control = "public, max-age=86400" } }, upstream = { @@ -153,8 +153,8 @@ GET /sse_custom --- response_headers Content-Type: text/event-stream; charset=utf-8 X-Accel-Buffering: no -Cache-Control: public, max-age=3600 -Connection: close +Cache-Control: public, max-age=86400 +Connection: Upgrade --- response_body data: hello from upstream --- error_code: 200 From 3c08d524b69bb581bbfb921fd3edf93ffd7daedb Mon Sep 17 00:00:00 2001 From: "william.mo" Date: Wed, 6 Aug 2025 20:56:19 -0700 Subject: [PATCH 04/11] fix lint --- apisix/plugins/sse.lua | 6 +++--- docs/en/latest/sse.md | 5 ++--- t/plugin/sse.t | 2 +- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/apisix/plugins/sse.lua b/apisix/plugins/sse.lua index a19cc70874c3..0fae49132155 100644 --- a/apisix/plugins/sse.lua +++ b/apisix/plugins/sse.lua @@ -16,7 +16,6 @@ -- local core = require("apisix.core") -local plugin = require("apisix.plugin") local plugin_name = "sse" @@ -25,7 +24,8 @@ local schema = { properties = { proxy_read_timeout = { type = "integer", - description = "Sets the timeout for reading a response from the proxied server, in seconds. A value of 0 turns off this timeout.", + description = "Sets the timeout for reading a response from the proxied server, " .. + "in seconds. A value of 0 turns off this timeout.", default = 3600, -- 1 hour minimum = 0, }, @@ -100,4 +100,4 @@ function _M.header_filter(conf, ctx) end end -return _M \ No newline at end of file +return _M diff --git a/docs/en/latest/sse.md b/docs/en/latest/sse.md index a5e2eb974fe9..b45cdbc57453 100644 --- a/docs/en/latest/sse.md +++ b/docs/en/latest/sse.md @@ -73,7 +73,7 @@ curl http://127.0.0.1:9180/apisix/admin/routes/1 -X PUT -d ' This example enables the plugin on the /sse route and sets a 2-hour timeout for SSE connections, ensuring the correct headers and proxy behavior are applied. -# Notes +## Notes This plugin is only relevant for routes that serve SSE (e.g., real-time feeds, logs, event notifications). @@ -83,7 +83,7 @@ If your upstream already sets the correct Content-Type, you can disable the over Ensure your upstream service flushes events frequently to keep the SSE connection alive. -# Disabling the Plugin +## Disabling the Plugin To disable the sse plugin on a route: @@ -101,4 +101,3 @@ curl http://127.0.0.1:9180/apisix/admin/routes/1 -X PATCH -d ' | Version | Description | | ------- | ----------------------------------------------- | | 0.1 | Initial version of the plugin with SSE support. | - diff --git a/t/plugin/sse.t b/t/plugin/sse.t index b0038121214c..7c2b8bcb03b4 100644 --- a/t/plugin/sse.t +++ b/t/plugin/sse.t @@ -159,4 +159,4 @@ Connection: Upgrade data: hello from upstream --- error_code: 200 --- no_error_log -[error] \ No newline at end of file +[error] From 59f68522d670e5e958f592460c2f4cabeddf0891 Mon Sep 17 00:00:00 2001 From: "william.mo" Date: Wed, 6 Aug 2025 21:24:18 -0700 Subject: [PATCH 05/11] update the priority to 1004, which there is no use --- apisix/plugins/sse.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apisix/plugins/sse.lua b/apisix/plugins/sse.lua index 0fae49132155..635995a1e19f 100644 --- a/apisix/plugins/sse.lua +++ b/apisix/plugins/sse.lua @@ -49,7 +49,7 @@ local schema = { local _M = { version = 0.1, - priority = 1005, -- Runs after authentication but before most other plugins. + priority = 1004, -- Runs after authentication but before most other plugins. name = plugin_name, schema = schema, stream_only = false, From d71dfe45368962f6346c7e0a9a23a084ac7e4443 Mon Sep 17 00:00:00 2001 From: "william.mo" Date: Wed, 6 Aug 2025 21:33:54 -0700 Subject: [PATCH 06/11] Trigger rebuild From 64bf92aa008f4c5d9777bc610d65a76a2bd82f6a Mon Sep 17 00:00:00 2001 From: William Mo Date: Thu, 7 Aug 2025 10:52:44 -0700 Subject: [PATCH 07/11] Update sse.t update the comments --- t/plugin/sse.t | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/t/plugin/sse.t b/t/plugin/sse.t index 7c2b8bcb03b4..5e6f783c4c13 100644 --- a/t/plugin/sse.t +++ b/t/plugin/sse.t @@ -1,19 +1,20 @@ --- --- Licensed to the Apache Software Foundation (ASF) under one or more --- contributor license agreements. See the NOTICE file distributed with --- this work for additional information regarding copyright ownership. --- The ASF licenses this file to You under the Apache License, Version 2.0 --- (the "License"); you may not use this file except in compliance with --- the License. You may obtain a copy of the License at --- --- http://www.apache.org/licenses/LICENSE-2.0 --- --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. --- +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + use t::APISIX 'no_plan'; run_tests(); From bd100f57b0143b7b8b32ca1314863986bb3622d5 Mon Sep 17 00:00:00 2001 From: "william.mo" Date: Sat, 9 Aug 2025 11:03:16 -0700 Subject: [PATCH 08/11] update test file --- t/plugin/sse.t | 118 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 78 insertions(+), 40 deletions(-) diff --git a/t/plugin/sse.t b/t/plugin/sse.t index 7c2b8bcb03b4..34a98e9f72f0 100644 --- a/t/plugin/sse.t +++ b/t/plugin/sse.t @@ -1,19 +1,19 @@ --- --- Licensed to the Apache Software Foundation (ASF) under one or more --- contributor license agreements. See the NOTICE file distributed with --- this work for additional information regarding copyright ownership. --- The ASF licenses this file to You under the Apache License, Version 2.0 --- (the "License"); you may not use this file except in compliance with --- the License. You may obtain a copy of the License at --- --- http://www.apache.org/licenses/LICENSE-2.0 --- --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. --- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + use t::APISIX 'no_plan'; run_tests(); @@ -27,6 +27,12 @@ location /t { local core = require("apisix.core") local t = require("t.test_funcs") + -- upstream returns SSE data + t.set_upstream(function() + ngx.header["Content-Type"] = "text/plain" + ngx.say("data: hello from upstream") + end) + local route_conf = { uri = "/sse", plugins = { @@ -40,17 +46,9 @@ location /t { } } t.add_route("/t/route/1", route_conf) - core.response.exit(200) } } ---- server_config -location /sse { - content_by_lua_block { - ngx.header["Content-Type"] = "text/plain" - ngx.say("data: hello from upstream") - } -} --- request GET /sse --- response_headers @@ -73,6 +71,12 @@ location /t { local core = require("apisix.core") local t = require("t.test_funcs") + -- upstream returns JSON + t.set_upstream(function() + ngx.header["Content-Type"] = "application/json" + ngx.say("{\"message\": \"hello\"}") + end) + local route_conf = { uri = "/sse_no_override", plugins = { @@ -88,17 +92,9 @@ location /t { } } t.add_route("/t/route/2", route_conf) - core.response.exit(200) } } ---- server_config -location /sse_no_override { - content_by_lua_block { - ngx.header["Content-Type"] = "application/json" - ngx.say("{\"message\": \"hello\"}") - } -} --- request GET /sse_no_override --- response_headers @@ -121,6 +117,12 @@ location /t { local core = require("apisix.core") local t = require("t.test_funcs") + -- upstream returns SSE data + t.set_upstream(function() + ngx.header["Content-Type"] = "text/plain" + ngx.say("data: hello from upstream") + end) + local route_conf = { uri = "/sse_custom", plugins = { @@ -137,17 +139,9 @@ location /t { } } t.add_route("/t/route/3", route_conf) - core.response.exit(200) } } ---- server_config -location /sse_custom { - content_by_lua_block { - ngx.header["Content-Type"] = "text/plain" - ngx.say("data: hello from upstream") - } -} --- request GET /sse_custom --- response_headers @@ -160,3 +154,47 @@ data: hello from upstream --- error_code: 200 --- no_error_log [error] + + + +=== TEST 4: route without SSE plugin (negative case) +--- config +location /t { + content_by_lua_block { + local core = require("apisix.core") + local t = require("t.test_funcs") + + -- upstream returns plain text + t.set_upstream(function() + ngx.header["Content-Type"] = "text/plain" + ngx.say("data: hello from upstream") + end) + + local route_conf = { + uri = "/no_sse", + plugins = { + -- No SSE plugin here + }, + upstream = { + type = "roundrobin", + nodes = { + [t.upstream_server_host_1] = 1, + } + } + } + t.add_route("/t/route/4", route_conf) + core.response.exit(200) + } +} +--- request +GET /no_sse +--- response_headers_unlike +Content-Type: text/event-stream +X-Accel-Buffering: .* +Cache-Control: .* +Connection: .* +--- response_body +data: hello from upstream +--- error_code: 200 +--- no_error_log +[error] From 2aaee3be8de5dc78939b00af716541b159eae545 Mon Sep 17 00:00:00 2001 From: "william.mo" Date: Sat, 9 Aug 2025 11:25:42 -0700 Subject: [PATCH 09/11] update test file --- t/plugin/sse.t | 22 +++++----------------- 1 file changed, 5 insertions(+), 17 deletions(-) diff --git a/t/plugin/sse.t b/t/plugin/sse.t index 83cf6b9e173c..7c0170009014 100644 --- a/t/plugin/sse.t +++ b/t/plugin/sse.t @@ -7,23 +7,6 @@ # # http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, -# either express or implied. See the License for the -# specific language governing permissions and limitations under the License. -# - -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -33,6 +16,11 @@ use t::APISIX 'no_plan'; +repeat_each(1); +no_long_string(); +no_root_location(); +no_shuffle(); + run_tests(); __DATA__ From ae47ddff52b389824ae7d0ac09b365c80f0183ad Mon Sep 17 00:00:00 2001 From: "william.mo" Date: Sat, 9 Aug 2025 13:49:17 -0700 Subject: [PATCH 10/11] update the test case --- t/plugin/sse.t | 379 ++++++++++++++++++++++++++++++------------------- 1 file changed, 229 insertions(+), 150 deletions(-) diff --git a/t/plugin/sse.t b/t/plugin/sse.t index 7c0170009014..00e7fd691661 100644 --- a/t/plugin/sse.t +++ b/t/plugin/sse.t @@ -18,188 +18,267 @@ use t::APISIX 'no_plan'; repeat_each(1); no_long_string(); -no_root_location(); no_shuffle(); +log_level('info'); + +add_block_preprocessor(sub { + my ($block) = @_; + + if (!$block->request) { + $block->set_value("request", "GET /t"); + } + + if (!$block->no_error_log && !$block->error_log) { + $block->set_value("no_error_log", "[error]"); + } +}); run_tests(); __DATA__ -=== TEST 1: sse plugin with default configuration +=== TEST 1: add upstream and route for sse plugin test --- config -location /t { - content_by_lua_block { - local core = require("apisix.core") - local t = require("t.test_funcs") - - -- upstream returns SSE data - t.set_upstream(function() - ngx.header["Content-Type"] = "text/plain" - ngx.say("data: hello from upstream") - end) - - local route_conf = { - uri = "/sse", - plugins = { - sse = {} - }, - upstream = { - type = "roundrobin", - nodes = { - [t.upstream_server_host_1] = 1, - } - } + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/upstreams/1', + ngx.HTTP_PUT, + [[{ + "type": "roundrobin", + "nodes": { + "127.0.0.1:1980": 1 + } + }]] + ) + + if code >= 300 then + ngx.status = code + return + end + + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "uri": "/sse_test", + "plugins": { + "sse": {} + }, + "upstream_id": "1" + }]] + ) + + if code >= 300 then + ngx.status = code + return + end + + ngx.say("passed") } - t.add_route("/t/route/1", route_conf) - core.response.exit(200) } -} ---- request -GET /sse ---- response_headers -Content-Type: text/event-stream; charset=utf-8 -X-Accel-Buffering: no -Cache-Control: no-cache -Connection: keep-alive --- response_body -data: hello from upstream ---- error_code: 200 ---- no_error_log -[error] +passed -=== TEST 2: sse plugin with override_content_type = false +=== TEST 2: test sse plugin with default configuration --- config -location /t { - content_by_lua_block { - local core = require("apisix.core") - local t = require("t.test_funcs") - - -- upstream returns JSON - t.set_upstream(function() - ngx.header["Content-Type"] = "application/json" - ngx.say("{\"message\": \"hello\"}") - end) - - local route_conf = { - uri = "/sse_no_override", - plugins = { - sse = { - override_content_type = false - } - }, - upstream = { - type = "roundrobin", - nodes = { - [t.upstream_server_host_1] = 1, - } - } + location /t { + content_by_lua_block { + local http = require "resty.http" + local httpc = http.new() + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/sse_test" + local res, err = httpc:request_uri(uri, { + method = "GET", + }) + + if not res then + ngx.log(ngx.ERR, "request failed: ", err) + ngx.status = 500 + return + end + + ngx.status = res.status + + -- Check headers + local content_type = res.headers["Content-Type"] + local x_accel_buffering = res.headers["X-Accel-Buffering"] + local cache_control = res.headers["Cache-Control"] + local connection = res.headers["Connection"] + + ngx.say("Status: ", res.status) + ngx.say("Content-Type: ", content_type or "nil") + ngx.say("X-Accel-Buffering: ", x_accel_buffering or "nil") + ngx.say("Cache-Control: ", cache_control or "nil") + ngx.say("Connection: ", connection or "nil") } - t.add_route("/t/route/2", route_conf) - core.response.exit(200) } -} ---- request -GET /sse_no_override ---- response_headers -Content-Type: application/json +--- response_body_like +Status: 200 +Content-Type: text/event-stream; charset=utf-8 X-Accel-Buffering: no Cache-Control: no-cache Connection: keep-alive ---- response_body -{"message": "hello"} ---- error_code: 200 ---- no_error_log -[error] -=== TEST 3: sse plugin with custom connection and cache-control headers +=== TEST 3: test sse plugin with override_content_type = false --- config -location /t { - content_by_lua_block { - local core = require("apisix.core") - local t = require("t.test_funcs") - - -- upstream returns SSE data - t.set_upstream(function() - ngx.header["Content-Type"] = "text/plain" - ngx.say("data: hello from upstream") - end) - - local route_conf = { - uri = "/sse_custom", - plugins = { - sse = { - connection_header = "Upgrade", - cache_control = "public, max-age=86400" - } - }, - upstream = { - type = "roundrobin", - nodes = { - [t.upstream_server_host_1] = 1, - } - } + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "uri": "/sse_test", + "plugins": { + "sse": { + "override_content_type": false + } + }, + "upstream_id": "1" + }]] + ) + + if code >= 300 then + ngx.status = code + return + end + + local http = require "resty.http" + local httpc = http.new() + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/sse_test" + local res, err = httpc:request_uri(uri, { + method = "GET", + }) + + if not res then + ngx.log(ngx.ERR, "request failed: ", err) + ngx.status = 500 + return + end + + ngx.status = res.status + + -- The upstream (port 1980) should return HTML by default + -- So Content-Type should NOT be text/event-stream + local content_type = res.headers["Content-Type"] + local x_accel_buffering = res.headers["X-Accel-Buffering"] + + ngx.say("Status: ", res.status) + ngx.say("Content-Type: ", content_type or "nil") + ngx.say("X-Accel-Buffering: ", x_accel_buffering or "nil") } - t.add_route("/t/route/3", route_conf) - core.response.exit(200) } -} ---- request -GET /sse_custom ---- response_headers -Content-Type: text/event-stream; charset=utf-8 +--- response_body_like +Status: 200 +Content-Type: (?!text/event-stream).* X-Accel-Buffering: no -Cache-Control: public, max-age=86400 + + + +=== TEST 4: test sse plugin with custom headers +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "uri": "/sse_test", + "plugins": { + "sse": { + "connection_header": "Upgrade", + "cache_control": "public, max-age=3600" + } + }, + "upstream_id": "1" + }]] + ) + + if code >= 300 then + ngx.status = code + return + end + + local http = require "resty.http" + local httpc = http.new() + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/sse_test" + local res, err = httpc:request_uri(uri, { + method = "GET", + }) + + if not res then + ngx.log(ngx.ERR, "request failed: ", err) + ngx.status = 500 + return + end + + ngx.status = res.status + + local cache_control = res.headers["Cache-Control"] + local connection = res.headers["Connection"] + + ngx.say("Status: ", res.status) + ngx.say("Cache-Control: ", cache_control or "nil") + ngx.say("Connection: ", connection or "nil") + } + } +--- response_body_like +Status: 200 +Cache-Control: public, max-age=3600 Connection: Upgrade + + + +=== TEST 5: test sse plugin schema validation +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.sse") + + -- Test valid config + local ok, err = plugin.check_schema({}) + if not ok then + ngx.say("Default schema validation failed: ", err) + return + end + + -- Test invalid proxy_read_timeout + local ok, err = plugin.check_schema({ + proxy_read_timeout = -1 + }) + if ok then + ngx.say("Schema validation should have failed for negative timeout") + return + end + + -- Test valid zero timeout + local ok, err = plugin.check_schema({ + proxy_read_timeout = 0 + }) + if not ok then + ngx.say("Zero timeout validation failed: ", err) + return + end + + ngx.say("Schema validation tests passed") + } + } --- response_body -data: hello from upstream ---- error_code: 200 ---- no_error_log -[error] +Schema validation tests passed -=== TEST 4: route without SSE plugin (negative case) +=== TEST 6: cleanup --- config -location /t { - content_by_lua_block { - local core = require("apisix.core") - local t = require("t.test_funcs") - - -- upstream returns plain text - t.set_upstream(function() - ngx.header["Content-Type"] = "text/plain" - ngx.say("data: hello from upstream") - end) - - local route_conf = { - uri = "/no_sse", - plugins = { - -- No SSE plugin here - }, - upstream = { - type = "roundrobin", - nodes = { - [t.upstream_server_host_1] = 1, - } - } + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', ngx.HTTP_DELETE) + local code, body = t('/apisix/admin/upstreams/1', ngx.HTTP_DELETE) + ngx.say("passed") } - t.add_route("/t/route/4", route_conf) - core.response.exit(200) } -} ---- request -GET /no_sse ---- response_headers_unlike -Content-Type: text/event-stream -X-Accel-Buffering: .* -Cache-Control: .* -Connection: .* --- response_body -data: hello from upstream ---- error_code: 200 ---- no_error_log -[error] +passed \ No newline at end of file From b4a7ff439261d30361c227d9c2924ffa42184f16 Mon Sep 17 00:00:00 2001 From: "william.mo" Date: Sat, 9 Aug 2025 13:51:38 -0700 Subject: [PATCH 11/11] rever back --- t/plugin/sse.t | 379 +++++++++++++++++++------------------------------ 1 file changed, 150 insertions(+), 229 deletions(-) diff --git a/t/plugin/sse.t b/t/plugin/sse.t index 00e7fd691661..7c0170009014 100644 --- a/t/plugin/sse.t +++ b/t/plugin/sse.t @@ -18,267 +18,188 @@ use t::APISIX 'no_plan'; repeat_each(1); no_long_string(); +no_root_location(); no_shuffle(); -log_level('info'); - -add_block_preprocessor(sub { - my ($block) = @_; - - if (!$block->request) { - $block->set_value("request", "GET /t"); - } - - if (!$block->no_error_log && !$block->error_log) { - $block->set_value("no_error_log", "[error]"); - } -}); run_tests(); __DATA__ -=== TEST 1: add upstream and route for sse plugin test +=== TEST 1: sse plugin with default configuration --- config - location /t { - content_by_lua_block { - local t = require("lib.test_admin").test - local code, body = t('/apisix/admin/upstreams/1', - ngx.HTTP_PUT, - [[{ - "type": "roundrobin", - "nodes": { - "127.0.0.1:1980": 1 - } - }]] - ) - - if code >= 300 then - ngx.status = code - return - end - - local code, body = t('/apisix/admin/routes/1', - ngx.HTTP_PUT, - [[{ - "uri": "/sse_test", - "plugins": { - "sse": {} - }, - "upstream_id": "1" - }]] - ) - - if code >= 300 then - ngx.status = code - return - end - - ngx.say("passed") +location /t { + content_by_lua_block { + local core = require("apisix.core") + local t = require("t.test_funcs") + + -- upstream returns SSE data + t.set_upstream(function() + ngx.header["Content-Type"] = "text/plain" + ngx.say("data: hello from upstream") + end) + + local route_conf = { + uri = "/sse", + plugins = { + sse = {} + }, + upstream = { + type = "roundrobin", + nodes = { + [t.upstream_server_host_1] = 1, + } + } } + t.add_route("/t/route/1", route_conf) + core.response.exit(200) } ---- response_body -passed - - - -=== TEST 2: test sse plugin with default configuration ---- config - location /t { - content_by_lua_block { - local http = require "resty.http" - local httpc = http.new() - local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/sse_test" - local res, err = httpc:request_uri(uri, { - method = "GET", - }) - - if not res then - ngx.log(ngx.ERR, "request failed: ", err) - ngx.status = 500 - return - end - - ngx.status = res.status - - -- Check headers - local content_type = res.headers["Content-Type"] - local x_accel_buffering = res.headers["X-Accel-Buffering"] - local cache_control = res.headers["Cache-Control"] - local connection = res.headers["Connection"] - - ngx.say("Status: ", res.status) - ngx.say("Content-Type: ", content_type or "nil") - ngx.say("X-Accel-Buffering: ", x_accel_buffering or "nil") - ngx.say("Cache-Control: ", cache_control or "nil") - ngx.say("Connection: ", connection or "nil") - } - } ---- response_body_like -Status: 200 +} +--- request +GET /sse +--- response_headers Content-Type: text/event-stream; charset=utf-8 X-Accel-Buffering: no Cache-Control: no-cache Connection: keep-alive +--- response_body +data: hello from upstream +--- error_code: 200 +--- no_error_log +[error] -=== TEST 3: test sse plugin with override_content_type = false +=== TEST 2: sse plugin with override_content_type = false --- config - location /t { - content_by_lua_block { - local t = require("lib.test_admin").test - local code, body = t('/apisix/admin/routes/1', - ngx.HTTP_PUT, - [[{ - "uri": "/sse_test", - "plugins": { - "sse": { - "override_content_type": false - } - }, - "upstream_id": "1" - }]] - ) - - if code >= 300 then - ngx.status = code - return - end - - local http = require "resty.http" - local httpc = http.new() - local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/sse_test" - local res, err = httpc:request_uri(uri, { - method = "GET", - }) - - if not res then - ngx.log(ngx.ERR, "request failed: ", err) - ngx.status = 500 - return - end - - ngx.status = res.status - - -- The upstream (port 1980) should return HTML by default - -- So Content-Type should NOT be text/event-stream - local content_type = res.headers["Content-Type"] - local x_accel_buffering = res.headers["X-Accel-Buffering"] - - ngx.say("Status: ", res.status) - ngx.say("Content-Type: ", content_type or "nil") - ngx.say("X-Accel-Buffering: ", x_accel_buffering or "nil") +location /t { + content_by_lua_block { + local core = require("apisix.core") + local t = require("t.test_funcs") + + -- upstream returns JSON + t.set_upstream(function() + ngx.header["Content-Type"] = "application/json" + ngx.say("{\"message\": \"hello\"}") + end) + + local route_conf = { + uri = "/sse_no_override", + plugins = { + sse = { + override_content_type = false + } + }, + upstream = { + type = "roundrobin", + nodes = { + [t.upstream_server_host_1] = 1, + } + } } + t.add_route("/t/route/2", route_conf) + core.response.exit(200) } ---- response_body_like -Status: 200 -Content-Type: (?!text/event-stream).* +} +--- request +GET /sse_no_override +--- response_headers +Content-Type: application/json X-Accel-Buffering: no +Cache-Control: no-cache +Connection: keep-alive +--- response_body +{"message": "hello"} +--- error_code: 200 +--- no_error_log +[error] -=== TEST 4: test sse plugin with custom headers +=== TEST 3: sse plugin with custom connection and cache-control headers --- config - location /t { - content_by_lua_block { - local t = require("lib.test_admin").test - local code, body = t('/apisix/admin/routes/1', - ngx.HTTP_PUT, - [[{ - "uri": "/sse_test", - "plugins": { - "sse": { - "connection_header": "Upgrade", - "cache_control": "public, max-age=3600" - } - }, - "upstream_id": "1" - }]] - ) - - if code >= 300 then - ngx.status = code - return - end - - local http = require "resty.http" - local httpc = http.new() - local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/sse_test" - local res, err = httpc:request_uri(uri, { - method = "GET", - }) - - if not res then - ngx.log(ngx.ERR, "request failed: ", err) - ngx.status = 500 - return - end - - ngx.status = res.status - - local cache_control = res.headers["Cache-Control"] - local connection = res.headers["Connection"] - - ngx.say("Status: ", res.status) - ngx.say("Cache-Control: ", cache_control or "nil") - ngx.say("Connection: ", connection or "nil") +location /t { + content_by_lua_block { + local core = require("apisix.core") + local t = require("t.test_funcs") + + -- upstream returns SSE data + t.set_upstream(function() + ngx.header["Content-Type"] = "text/plain" + ngx.say("data: hello from upstream") + end) + + local route_conf = { + uri = "/sse_custom", + plugins = { + sse = { + connection_header = "Upgrade", + cache_control = "public, max-age=86400" + } + }, + upstream = { + type = "roundrobin", + nodes = { + [t.upstream_server_host_1] = 1, + } + } } + t.add_route("/t/route/3", route_conf) + core.response.exit(200) } ---- response_body_like -Status: 200 -Cache-Control: public, max-age=3600 +} +--- request +GET /sse_custom +--- response_headers +Content-Type: text/event-stream; charset=utf-8 +X-Accel-Buffering: no +Cache-Control: public, max-age=86400 Connection: Upgrade - - - -=== TEST 5: test sse plugin schema validation ---- config - location /t { - content_by_lua_block { - local plugin = require("apisix.plugins.sse") - - -- Test valid config - local ok, err = plugin.check_schema({}) - if not ok then - ngx.say("Default schema validation failed: ", err) - return - end - - -- Test invalid proxy_read_timeout - local ok, err = plugin.check_schema({ - proxy_read_timeout = -1 - }) - if ok then - ngx.say("Schema validation should have failed for negative timeout") - return - end - - -- Test valid zero timeout - local ok, err = plugin.check_schema({ - proxy_read_timeout = 0 - }) - if not ok then - ngx.say("Zero timeout validation failed: ", err) - return - end - - ngx.say("Schema validation tests passed") - } - } --- response_body -Schema validation tests passed +data: hello from upstream +--- error_code: 200 +--- no_error_log +[error] -=== TEST 6: cleanup +=== TEST 4: route without SSE plugin (negative case) --- config - location /t { - content_by_lua_block { - local t = require("lib.test_admin").test - local code, body = t('/apisix/admin/routes/1', ngx.HTTP_DELETE) - local code, body = t('/apisix/admin/upstreams/1', ngx.HTTP_DELETE) - ngx.say("passed") +location /t { + content_by_lua_block { + local core = require("apisix.core") + local t = require("t.test_funcs") + + -- upstream returns plain text + t.set_upstream(function() + ngx.header["Content-Type"] = "text/plain" + ngx.say("data: hello from upstream") + end) + + local route_conf = { + uri = "/no_sse", + plugins = { + -- No SSE plugin here + }, + upstream = { + type = "roundrobin", + nodes = { + [t.upstream_server_host_1] = 1, + } + } } + t.add_route("/t/route/4", route_conf) + core.response.exit(200) } +} +--- request +GET /no_sse +--- response_headers_unlike +Content-Type: text/event-stream +X-Accel-Buffering: .* +Cache-Control: .* +Connection: .* --- response_body -passed \ No newline at end of file +data: hello from upstream +--- error_code: 200 +--- no_error_log +[error]