Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 105 additions & 45 deletions apisix/discovery/nacos/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ local core = require('apisix.core')
local ipairs = ipairs
local pairs = pairs
local type = type
local math = math
local math_random = math.random
local ngx = ngx
local ngx_re = require('ngx.re')
Expand Down Expand Up @@ -164,10 +163,7 @@ local function get_signed_param(group_name, service_name)
end


local function get_base_uri()
local host = local_conf.discovery.nacos.host
-- TODO Add health check to get healthy nodes.
local url = host[math_random(#host)]
local function build_base_uri(url)
local auth_idx = core.string.rfind_char(url, '@')
local username, password
if auth_idx then
Expand Down Expand Up @@ -195,6 +191,21 @@ local function get_base_uri()
end


local function get_base_uri_by_index(index)
local host = local_conf.discovery.nacos.host
if type(host) ~= 'table' then
return nil
end

local url = host[index]
if not url then
return nil
end

return build_base_uri(url)
end


local function de_duplication(services, namespace_id, group_name, service_name, scheme)
for _, service in ipairs(services) do
if service.namespace_id == namespace_id and service.group_name == group_name
Expand Down Expand Up @@ -277,69 +288,118 @@ local function is_grpc(scheme)
end

local curr_service_in_use = {}
local function fetch_full_registry(premature)
if premature then
return
end

local base_uri, username, password = get_base_uri()

local function fetch_from_host(base_uri, username, password, services)
local token_param, err = get_token_param(base_uri, username, password)
if err then
log.error('get_token_param error:', err)
return
return false, err
end

local infos = get_nacos_services()
if #infos == 0 then
return
end
local service_names = {}
for _, service_info in ipairs(infos) do
local data, err
local nodes_cache = {}
local had_success = false
local last_err

for _, service_info in ipairs(services) do
local namespace_id = service_info.namespace_id
local group_name = service_info.group_name
local scheme = service_info.scheme or ''
local namespace_param = get_namespace_param(service_info.namespace_id)
local group_name_param = get_group_name_param(service_info.group_name)
local signature_param = get_signed_param(service_info.group_name, service_info.service_name)
local namespace_param = get_namespace_param(namespace_id)
local group_name_param = get_group_name_param(group_name)
local signature_param = get_signed_param(group_name, service_info.service_name)
local query_path = instance_list_path .. service_info.service_name
.. token_param .. namespace_param .. group_name_param
.. signature_param
data, err = get_url(base_uri, query_path)
if err then
log.error('get_url:', query_path, ' err:', err)
goto CONTINUE
end
local data, req_err = get_url(base_uri, query_path)
if req_err then
last_err = req_err
log.warn('get_url:', query_path, ' err:', req_err, ', host:', base_uri)
else
had_success = true

local nodes = {}
local key = get_key(namespace_id, group_name, service_info.service_name)
service_names[key] = true
for _, host in ipairs(data.hosts) do
local node = {
host = host.ip,
port = host.port,
weight = host.weight or default_weight,
}
-- docs: https://github.com/yidongnan/grpc-spring-boot-starter/pull/496
if is_grpc(scheme) and host.metadata and host.metadata.gRPC_port then
node.port = host.metadata.gRPC_port
local key = get_key(namespace_id, group_name, service_info.service_name)
service_names[key] = true

local hosts = data.hosts
if type(hosts) ~= 'table' then
hosts = {}
end

core.table.insert(nodes, node)
end
if #nodes > 0 then
local content = core.json.encode(nodes)
nacos_dict:set(key, content)
local nodes = {}
for _, host in ipairs(hosts) do
local node = {
host = host.ip,
port = host.port,
weight = host.weight or default_weight,
}
-- docs: https://github.com/yidongnan/grpc-spring-boot-starter/pull/496
if is_grpc(scheme) and host.metadata and host.metadata.gRPC_port then
node.port = host.metadata.gRPC_port
end

core.table.insert(nodes, node)
end

if #nodes > 0 then
nodes_cache[key] = nodes
end
end
::CONTINUE::
end
-- remove services that are not in use anymore

if not had_success then
return false, last_err or 'no available nacos services'
end

for key, nodes in pairs(nodes_cache) do
local content = core.json.encode(nodes)
nacos_dict:set(key, content)
end

for key, _ in pairs(curr_service_in_use) do
if not service_names[key] then
nacos_dict:delete(key)
end
end

curr_service_in_use = service_names
return true
end


local function fetch_full_registry(premature)
if premature then
return
end

local infos = get_nacos_services()
if #infos == 0 then
return
end

local host_list = local_conf.discovery.nacos.host
local host_count = #host_list
local start = math_random(host_count)
local last_err

for i = 0, host_count - 1 do
local idx = (start + i - 1) % host_count + 1
local base_uri, username, password = get_base_uri_by_index(idx)

if not base_uri then
last_err = 'invalid nacos host entry'
log.warn('nacos host at index ', idx, ' is invalid, skip')
else
local ok, err = fetch_from_host(base_uri, username, password, infos)
if ok then
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remember index of healthy one in a variable in this lua module, so that we can chose the healthy one nacos host directly in next sync.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea is that the states of health and unhealth are unreliable without health checks, as they can change at any time. Therefore, I don't see the need to store these states.

The current implementation is only a minor improvement; adding health checks will completely eliminate this problem.

end
last_err = err
log.warn('fetch_from_host failed from host ', base_uri, ': ', err)
end
end

log.error('failed to fetch nacos registry from all hosts: ', last_err)
end


Expand Down
46 changes: 46 additions & 0 deletions t/discovery/nacos2.t
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,49 @@ discovery:
}
--- response_body
2



=== TEST 6: fallback to next nacos host when current host fails
--- yaml_config
apisix:
node_listen: 1984
deployment:
role: data_plane
role_data_plane:
config_provider: yaml
discovery:
nacos:
host:
- "http://127.0.0.1:20998"
- "http://127.0.0.1:8858"
prefix: "/nacos/v1/"
fetch_interval: 1
weight: 1
timeout:
connect: 2000
send: 2000
read: 5000
--- apisix_yaml
routes:
-
uri: /hello
upstream:
service_name: APISIX-NACOS
discovery_type: nacos
type: roundrobin
#END
--- http_config
server {
listen 20998;

location / {
return 502;
}
}
--- request
GET /hello
--- response_body_like eval
qr/server [1-2]/
--- error_log
fetch_from_host failed from host http://127.0.0.1:20998/nacos/v1/: status = 502
Loading