From d38f6a659018d60ceb8f73db33c67c0907069fcd Mon Sep 17 00:00:00 2001 From: chronolaw Date: Thu, 26 Dec 2024 07:40:47 +0800 Subject: [PATCH 01/12] comments clean --- kong/clustering/rpc/concentrator.lua | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/kong/clustering/rpc/concentrator.lua b/kong/clustering/rpc/concentrator.lua index 80d19cad769..98a58b9a9f5 100644 --- a/kong/clustering/rpc/concentrator.lua +++ b/kong/clustering/rpc/concentrator.lua @@ -7,6 +7,8 @@ local queue = require("kong.clustering.rpc.queue") local cjson = require("cjson") local jsonrpc = require("kong.clustering.rpc.json_rpc_v2") local rpc_utils = require("kong.clustering.rpc.utils") +--local isarray = require("table.isarray") +--local isempty = require("table.isempty") local setmetatable = setmetatable @@ -190,8 +192,8 @@ function _M:_event_loop(lconn) ::continue:: end - end - end + end -- if n.channel == rpc_resp_channel_name + end -- while true local res, err = lconn:wait_for_notification() if not res then @@ -217,7 +219,7 @@ function _M:_event_loop(lconn) else notifications_queue:push(res) end - end + end -- get from rpc call end From 79f490694430f07a71a6a4e79a814e851e2024b6 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Thu, 26 Dec 2024 10:00:51 +0800 Subject: [PATCH 02/12] process one response --- kong/clustering/rpc/concentrator.lua | 42 ++++++++++++++++++---------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/kong/clustering/rpc/concentrator.lua b/kong/clustering/rpc/concentrator.lua index 98a58b9a9f5..9e21ad58dba 100644 --- a/kong/clustering/rpc/concentrator.lua +++ b/kong/clustering/rpc/concentrator.lua @@ -7,7 +7,7 @@ local queue = require("kong.clustering.rpc.queue") local cjson = require("cjson") local jsonrpc = require("kong.clustering.rpc.json_rpc_v2") local rpc_utils = require("kong.clustering.rpc.utils") ---local isarray = require("table.isarray") +local isarray = require("table.isarray") --local isempty = require("table.isempty") @@ -92,6 +92,27 @@ local function enqueue_notifications(notifications, notifications_queue) end +function _M:process_one_response(payload) + assert(payload.jsonrpc == jsonrpc.VERSION) + local payload_id = payload.id + + -- response + local cb = self.interest[payload_id] + self.interest[payload_id] = nil -- edge trigger only once + + if cb then + local res, err = cb(payload) + if not res then + ngx_log(ngx_WARN, "[rpc] concentrator response interest handler failed: id: ", + payload_id, ", err: ", err) + end + + else + ngx_log(ngx_WARN, "[rpc] no interest for concentrator response id: ", payload_id, ", dropping it") + end +end + + function _M:_event_loop(lconn) local notifications_queue = queue.new(4096) local rpc_resp_channel_name = RESP_CHANNEL_PREFIX .. self.worker_id @@ -118,21 +139,14 @@ function _M:_event_loop(lconn) if n.channel == rpc_resp_channel_name then -- an response for a previous RPC call we asked for local payload = cjson_decode(n.payload) - assert(payload.jsonrpc == jsonrpc.VERSION) - - -- response - local cb = self.interest[payload.id] - self.interest[payload.id] = nil -- edge trigger only once - if cb then - local res, err = cb(payload) - if not res then - ngx_log(ngx_WARN, "[rpc] concentrator response interest handler failed: id: ", - payload.id, ", err: ", err) - end + if not isarray(payload) then + process_one_response(payload) else - ngx_log(ngx_WARN, "[rpc] no interest for concentrator response id: ", payload.id, ", dropping it") + for _, v in ipairs(payload) do + process_one_response(v) + end end else @@ -191,7 +205,7 @@ function _M:_event_loop(lconn) end ::continue:: - end + end -- for _, call end -- if n.channel == rpc_resp_channel_name end -- while true From 028c75085bf6fbac0128254210517821169670f0 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Thu, 26 Dec 2024 10:21:30 +0800 Subject: [PATCH 03/12] lint fix --- kong/clustering/rpc/concentrator.lua | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kong/clustering/rpc/concentrator.lua b/kong/clustering/rpc/concentrator.lua index 9e21ad58dba..c979eb84df8 100644 --- a/kong/clustering/rpc/concentrator.lua +++ b/kong/clustering/rpc/concentrator.lua @@ -141,11 +141,11 @@ function _M:_event_loop(lconn) local payload = cjson_decode(n.payload) if not isarray(payload) then - process_one_response(payload) + self:process_one_response(payload) else for _, v in ipairs(payload) do - process_one_response(v) + self:process_one_response(v) end end @@ -233,7 +233,7 @@ function _M:_event_loop(lconn) else notifications_queue:push(res) end - end -- get from rpc call + end -- while not exiting() end From 431ef24d022b84ddf1119020f110dc7a4d445bae Mon Sep 17 00:00:00 2001 From: chronolaw Date: Thu, 26 Dec 2024 15:06:25 +0800 Subject: [PATCH 04/12] process request --- kong/clustering/rpc/concentrator.lua | 78 ++++++++++++++++------------ 1 file changed, 46 insertions(+), 32 deletions(-) diff --git a/kong/clustering/rpc/concentrator.lua b/kong/clustering/rpc/concentrator.lua index c979eb84df8..4a9fa86be53 100644 --- a/kong/clustering/rpc/concentrator.lua +++ b/kong/clustering/rpc/concentrator.lua @@ -113,6 +113,46 @@ function _M:process_one_response(payload) end +function _M:process_one_request(payload, collection) + local payload_id = payload.id + + local res, err = self.manager:_local_call(target_id, payload.method, + payload.params, not payload_id) + + -- notification has no callback or id + if not payload_id then + ngx_log(ngx_DEBUG, "[rpc] notification has no response") + return + end + + if res then + -- call success + res, err = self:_enqueue_rpc_response(reply_to, { + jsonrpc = jsonrpc.VERSION, + id = payload_id, + result = res, + }, collection) + if not res then + ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC call result: ", err) + end + + else + -- call failure + res, err = self:_enqueue_rpc_response(reply_to, { + jsonrpc = jsonrpc.VERSION, + id = payload_id, + error = { + code = jsonrpc.SERVER_ERROR, + message = tostring(err), + } + }, collection) + if not res then + ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err) + end + end +end + + function _M:_event_loop(lconn) local notifications_queue = queue.new(4096) local rpc_resp_channel_name = RESP_CHANNEL_PREFIX .. self.worker_id @@ -144,8 +184,9 @@ function _M:_event_loop(lconn) self:process_one_response(payload) else + local collection = {} for _, v in ipairs(payload) do - self:process_one_response(v) + self:process_one_response(v, collection) end end @@ -169,42 +210,15 @@ function _M:_event_loop(lconn) local reply_to = assert(call.reply_to, "unknown requester for RPC") - local res, err = self.manager:_local_call(target_id, payload.method, - payload.params, not payload.id) - - -- notification has no callback or id - if not payload.id then - ngx_log(ngx_DEBUG, "[rpc] notification has no response") - goto continue - end - - if res then - -- call success - res, err = self:_enqueue_rpc_response(reply_to, { - jsonrpc = jsonrpc.VERSION, - id = payload.id, - result = res, - }) - if not res then - ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC call result: ", err) - end + if not isarray(payload) then + self:process_one_request(payload) else - -- call failure - res, err = self:_enqueue_rpc_response(reply_to, { - jsonrpc = jsonrpc.VERSION, - id = payload.id, - error = { - code = jsonrpc.SERVER_ERROR, - message = tostring(err), - } - }) - if not res then - ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err) + for _, v in ipairs(payload) do + self:process_one_request(v) end end - ::continue:: end -- for _, call end -- if n.channel == rpc_resp_channel_name end -- while true From 926b561391519cc6103994f150eb2e312df3ac22 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Thu, 26 Dec 2024 15:07:59 +0800 Subject: [PATCH 05/12] process one request --- kong/clustering/rpc/concentrator.lua | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kong/clustering/rpc/concentrator.lua b/kong/clustering/rpc/concentrator.lua index 4a9fa86be53..82016fb138e 100644 --- a/kong/clustering/rpc/concentrator.lua +++ b/kong/clustering/rpc/concentrator.lua @@ -113,7 +113,7 @@ function _M:process_one_response(payload) end -function _M:process_one_request(payload, collection) +function _M:process_one_request(target_id, reply_to, payload, collection) local payload_id = payload.id local res, err = self.manager:_local_call(target_id, payload.method, @@ -211,11 +211,11 @@ function _M:_event_loop(lconn) "unknown requester for RPC") if not isarray(payload) then - self:process_one_request(payload) + self:process_one_request(target_id, reply_to, payload) else for _, v in ipairs(payload) do - self:process_one_request(v) + self:process_one_request(target_id, reply_to, v) end end From bc00ca2cf6b174a6f24d784a07de13406c0d10b1 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Thu, 26 Dec 2024 15:17:08 +0800 Subject: [PATCH 06/12] process collection --- kong/clustering/rpc/concentrator.lua | 29 ++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/kong/clustering/rpc/concentrator.lua b/kong/clustering/rpc/concentrator.lua index 82016fb138e..17fa4409f50 100644 --- a/kong/clustering/rpc/concentrator.lua +++ b/kong/clustering/rpc/concentrator.lua @@ -8,7 +8,7 @@ local cjson = require("cjson") local jsonrpc = require("kong.clustering.rpc.json_rpc_v2") local rpc_utils = require("kong.clustering.rpc.utils") local isarray = require("table.isarray") ---local isempty = require("table.isempty") +local isempty = require("table.isempty") local setmetatable = setmetatable @@ -181,12 +181,13 @@ function _M:_event_loop(lconn) local payload = cjson_decode(n.payload) if not isarray(payload) then + -- one rpc response self:process_one_response(payload) else - local collection = {} + -- batch rpc response for _, v in ipairs(payload) do - self:process_one_response(v, collection) + self:process_one_response(v) end end @@ -211,13 +212,24 @@ function _M:_event_loop(lconn) "unknown requester for RPC") if not isarray(payload) then + -- one rpc call self:process_one_request(target_id, reply_to, payload) else + local collection = {} + + -- batching rpc call for _, v in ipairs(payload) do - self:process_one_request(target_id, reply_to, v) + self:process_one_request(target_id, reply_to, v, collection) end - end + + if not isempty(collection) then + local res, err = self:_enqueue_rpc_response(reply_to, collection) + if not res then + ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC call result: ", err) + end + end + end -- if not isarray(payload) end -- for _, call end -- if n.channel == rpc_resp_channel_name @@ -300,7 +312,12 @@ end -- enqueue a RPC response from CP worker with ID worker_id -function _M:_enqueue_rpc_response(worker_id, payload) +function _M:_enqueue_rpc_response(worker_id, payload, collection) + if collection then + table.insert(collection, payload) + return + end + local sql = string_format("SELECT pg_notify(%s, %s);", self.db.connector:escape_literal(RESP_CHANNEL_PREFIX .. worker_id), self.db.connector:escape_literal(cjson_encode(payload))) From fee9a589576f1ff862d358be3a2aaa7a2537e8fa Mon Sep 17 00:00:00 2001 From: chronolaw Date: Thu, 26 Dec 2024 16:47:08 +0800 Subject: [PATCH 07/12] clean --- kong/clustering/rpc/concentrator.lua | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/kong/clustering/rpc/concentrator.lua b/kong/clustering/rpc/concentrator.lua index 17fa4409f50..6050b7a948f 100644 --- a/kong/clustering/rpc/concentrator.lua +++ b/kong/clustering/rpc/concentrator.lua @@ -100,15 +100,15 @@ function _M:process_one_response(payload) local cb = self.interest[payload_id] self.interest[payload_id] = nil -- edge trigger only once - if cb then - local res, err = cb(payload) - if not res then - ngx_log(ngx_WARN, "[rpc] concentrator response interest handler failed: id: ", - payload_id, ", err: ", err) - end - - else + if not cb then ngx_log(ngx_WARN, "[rpc] no interest for concentrator response id: ", payload_id, ", dropping it") + return + end + + local res, err = cb(payload) + if not res then + ngx_log(ngx_WARN, "[rpc] concentrator response interest handler failed: id: ", + payload_id, ", err: ", err) end end From 24814a8ab8f3514cca012e0b6334e2d4ecd375ad Mon Sep 17 00:00:00 2001 From: chronolaw Date: Fri, 27 Dec 2024 10:03:05 +0800 Subject: [PATCH 08/12] tb_insert --- kong/clustering/rpc/concentrator.lua | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kong/clustering/rpc/concentrator.lua b/kong/clustering/rpc/concentrator.lua index 6050b7a948f..1500bf43bef 100644 --- a/kong/clustering/rpc/concentrator.lua +++ b/kong/clustering/rpc/concentrator.lua @@ -9,6 +9,7 @@ local jsonrpc = require("kong.clustering.rpc.json_rpc_v2") local rpc_utils = require("kong.clustering.rpc.utils") local isarray = require("table.isarray") local isempty = require("table.isempty") +local tb_insert = table.insert local setmetatable = setmetatable @@ -314,7 +315,7 @@ end -- enqueue a RPC response from CP worker with ID worker_id function _M:_enqueue_rpc_response(worker_id, payload, collection) if collection then - table.insert(collection, payload) + tb_insert(collection, payload) return end From f8c02c5e0339aee70482a031e0deec7b32249be2 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Fri, 27 Dec 2024 15:56:56 +0800 Subject: [PATCH 09/12] check payload --- kong/clustering/rpc/concentrator.lua | 51 ++++++++++++++++++++++------ 1 file changed, 40 insertions(+), 11 deletions(-) diff --git a/kong/clustering/rpc/concentrator.lua b/kong/clustering/rpc/concentrator.lua index 1500bf43bef..51a310a9da7 100644 --- a/kong/clustering/rpc/concentrator.lua +++ b/kong/clustering/rpc/concentrator.lua @@ -12,6 +12,7 @@ local isempty = require("table.isempty") local tb_insert = table.insert +local type = type local setmetatable = setmetatable local tostring = tostring local pcall = pcall @@ -25,6 +26,7 @@ local ngx_log = ngx.log local ngx_ERR = ngx.ERR local ngx_WARN = ngx.WARN local ngx_DEBUG = ngx.DEBUG +local new_error = jsonrpc.new_error local RESP_CHANNEL_PREFIX = "rpc:resp:" -- format: rpc:resp: @@ -115,6 +117,18 @@ end function _M:process_one_request(target_id, reply_to, payload, collection) + if type(payload) ~= "table" then + local res, err = self:_enqueue_rpc_response( + reply_to, + new_error(nil, jsonrpc.INVALID_REQUEST, "Invalid Request"), + collection) + if not res then + ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err) + end + + return + end + local payload_id = payload.id local res, err = self.manager:_local_call(target_id, payload.method, @@ -216,22 +230,37 @@ function _M:_event_loop(lconn) -- one rpc call self:process_one_request(target_id, reply_to, payload) - else - local collection = {} + goto continue + end - -- batching rpc call - for _, v in ipairs(payload) do - self:process_one_request(target_id, reply_to, v, collection) + -- rpc call with an empty Array + if isempty(payload) then + local res, err = self:_enqueue_rpc_response( + reply_to, + new_error(nil, jsonrpc.INVALID_REQUEST, "Invalid Request")) + if not res then + ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err) end - if not isempty(collection) then - local res, err = self:_enqueue_rpc_response(reply_to, collection) - if not res then - ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC call result: ", err) - end + goto continue + end + + -- batching rpc call + + local collection = {} + + for _, v in ipairs(payload) do + self:process_one_request(target_id, reply_to, v, collection) + end + + if not isempty(collection) then + local res, err = self:_enqueue_rpc_response(reply_to, collection) + if not res then + ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC call result: ", err) end - end -- if not isarray(payload) + end + ::continue:: end -- for _, call end -- if n.channel == rpc_resp_channel_name end -- while true From d47eed9ae5963f25b2400f2ef9c008d8e2b6e543 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Mon, 30 Dec 2024 15:54:27 +0800 Subject: [PATCH 10/12] comments --- kong/clustering/rpc/concentrator.lua | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kong/clustering/rpc/concentrator.lua b/kong/clustering/rpc/concentrator.lua index 51a310a9da7..3de6093040d 100644 --- a/kong/clustering/rpc/concentrator.lua +++ b/kong/clustering/rpc/concentrator.lua @@ -342,6 +342,8 @@ end -- enqueue a RPC response from CP worker with ID worker_id +-- collection is only for rpc batch call. +-- if collection is nil, it means the rpc is a single call. function _M:_enqueue_rpc_response(worker_id, payload, collection) if collection then tb_insert(collection, payload) From b315d51c8bd2ea5fa2079378ed8e7d057260b3db Mon Sep 17 00:00:00 2001 From: chronolaw Date: Mon, 30 Dec 2024 17:01:18 +0800 Subject: [PATCH 11/12] err msg --- kong/clustering/rpc/concentrator.lua | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kong/clustering/rpc/concentrator.lua b/kong/clustering/rpc/concentrator.lua index 3de6093040d..0a40e0a25bb 100644 --- a/kong/clustering/rpc/concentrator.lua +++ b/kong/clustering/rpc/concentrator.lua @@ -120,7 +120,7 @@ function _M:process_one_request(target_id, reply_to, payload, collection) if type(payload) ~= "table" then local res, err = self:_enqueue_rpc_response( reply_to, - new_error(nil, jsonrpc.INVALID_REQUEST, "Invalid Request"), + new_error(nil, jsonrpc.INVALID_REQUEST, "not an valid object"), collection) if not res then ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err) @@ -237,7 +237,7 @@ function _M:_event_loop(lconn) if isempty(payload) then local res, err = self:_enqueue_rpc_response( reply_to, - new_error(nil, jsonrpc.INVALID_REQUEST, "Invalid Request")) + new_error(nil, jsonrpc.INVALID_REQUEST, "empty batch array")) if not res then ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err) end From 8acaf96906821d950490c240efa81b904026eab9 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Wed, 15 Jan 2025 14:02:50 +0800 Subject: [PATCH 12/12] check payload.id --- kong/clustering/rpc/concentrator.lua | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/kong/clustering/rpc/concentrator.lua b/kong/clustering/rpc/concentrator.lua index 0a40e0a25bb..6e2ab5785c2 100644 --- a/kong/clustering/rpc/concentrator.lua +++ b/kong/clustering/rpc/concentrator.lua @@ -99,6 +99,16 @@ function _M:process_one_response(payload) assert(payload.jsonrpc == jsonrpc.VERSION) local payload_id = payload.id + -- may be some error message for peer + if not payload_id then + if payload.error then + ngx_log(ngx_ERR, "[rpc] RPC failed, code: ", + payload.error.code, ", err: ", + payload.error.message) + end + return + end + -- response local cb = self.interest[payload_id] self.interest[payload_id] = nil -- edge trigger only once