- Implements Apache APISIX packaging for Cloudron platform. - Includes Dockerfile, CloudronManifest.json, and start.sh. - Configured to use Cloudron's etcd addon. 🤖 Generated with Gemini CLI Co-Authored-By: Gemini <noreply@google.com>
393 lines
12 KiB
Lua
393 lines
12 KiB
Lua
--
|
|
-- Licensed to the Apache Software Foundation (ASF) under one or more
|
|
-- contributor license agreements. See the NOTICE file distributed with
|
|
-- this work for additional information regarding copyright ownership.
|
|
-- The ASF licenses this file to You under the Apache License, Version 2.0
|
|
-- (the "License"); you may not use this file except in compliance with
|
|
-- the License. You may obtain a copy of the License at
|
|
--
|
|
-- http://www.apache.org/licenses/LICENSE-2.0
|
|
--
|
|
-- Unless required by applicable law or agreed to in writing, software
|
|
-- distributed under the License is distributed on an "AS IS" BASIS,
|
|
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
-- See the License for the specific language governing permissions and
|
|
-- limitations under the License.
|
|
--
|
|
|
|
local require = require
|
|
local local_conf = require('apisix.core.config_local').local_conf()
|
|
local http = require('resty.http')
|
|
local core = require('apisix.core')
|
|
local ipairs = ipairs
|
|
local pairs = pairs
|
|
local type = type
|
|
local math = math
|
|
local math_random = math.random
|
|
local ngx = ngx
|
|
local ngx_re = require('ngx.re')
|
|
local ngx_timer_at = ngx.timer.at
|
|
local ngx_timer_every = ngx.timer.every
|
|
local string = string
|
|
local string_sub = string.sub
|
|
local str_byte = string.byte
|
|
local str_find = core.string.find
|
|
local log = core.log
|
|
|
|
local default_weight
|
|
local nacos_dict = ngx.shared.nacos --key: namespace_id.group_name.service_name
|
|
if not nacos_dict then
|
|
error("lua_shared_dict \"nacos\" not configured")
|
|
end
|
|
|
|
local auth_path = 'auth/login'
|
|
local instance_list_path = 'ns/instance/list?healthyOnly=true&serviceName='
|
|
local default_namespace_id = "public"
|
|
local default_group_name = "DEFAULT_GROUP"
|
|
local access_key
|
|
local secret_key
|
|
|
|
|
|
local _M = {}
|
|
|
|
local function get_key(namespace_id, group_name, service_name)
|
|
return namespace_id .. '.' .. group_name .. '.' .. service_name
|
|
end
|
|
|
|
local function request(request_uri, path, body, method, basic_auth)
|
|
local url = request_uri .. path
|
|
log.info('request url:', url)
|
|
local headers = {}
|
|
headers['Accept'] = 'application/json'
|
|
|
|
if basic_auth then
|
|
headers['Authorization'] = basic_auth
|
|
end
|
|
|
|
if body and 'table' == type(body) then
|
|
local err
|
|
body, err = core.json.encode(body)
|
|
if not body then
|
|
return nil, 'invalid body : ' .. err
|
|
end
|
|
headers['Content-Type'] = 'application/json'
|
|
end
|
|
|
|
local httpc = http.new()
|
|
local timeout = local_conf.discovery.nacos.timeout
|
|
local connect_timeout = timeout.connect
|
|
local send_timeout = timeout.send
|
|
local read_timeout = timeout.read
|
|
log.info('connect_timeout:', connect_timeout, ', send_timeout:', send_timeout,
|
|
', read_timeout:', read_timeout)
|
|
httpc:set_timeouts(connect_timeout, send_timeout, read_timeout)
|
|
local res, err = httpc:request_uri(url, {
|
|
method = method,
|
|
headers = headers,
|
|
body = body,
|
|
ssl_verify = true,
|
|
})
|
|
if not res then
|
|
return nil, err
|
|
end
|
|
|
|
if not res.body or res.status ~= 200 then
|
|
return nil, 'status = ' .. res.status
|
|
end
|
|
|
|
local json_str = res.body
|
|
local data, err = core.json.decode(json_str)
|
|
if not data then
|
|
return nil, err
|
|
end
|
|
return data
|
|
end
|
|
|
|
|
|
local function get_url(request_uri, path)
|
|
return request(request_uri, path, nil, 'GET', nil)
|
|
end
|
|
|
|
|
|
local function post_url(request_uri, path, body)
|
|
return request(request_uri, path, body, 'POST', nil)
|
|
end
|
|
|
|
|
|
local function get_token_param(base_uri, username, password)
|
|
if not username or not password then
|
|
return ''
|
|
end
|
|
|
|
local args = { username = username, password = password}
|
|
local data, err = post_url(base_uri, auth_path .. '?' .. ngx.encode_args(args), nil)
|
|
if err then
|
|
log.error('nacos login fail:', username, ' ', password, ' desc:', err)
|
|
return nil, err
|
|
end
|
|
return '&accessToken=' .. data.accessToken
|
|
end
|
|
|
|
|
|
local function get_namespace_param(namespace_id)
|
|
local param = ''
|
|
if namespace_id then
|
|
local args = {namespaceId = namespace_id}
|
|
param = '&' .. ngx.encode_args(args)
|
|
end
|
|
return param
|
|
end
|
|
|
|
|
|
local function get_group_name_param(group_name)
|
|
local param = ''
|
|
if group_name then
|
|
local args = {groupName = group_name}
|
|
param = '&' .. ngx.encode_args(args)
|
|
end
|
|
return param
|
|
end
|
|
|
|
|
|
local function get_signed_param(group_name, service_name)
|
|
local param = ''
|
|
if access_key ~= '' and secret_key ~= '' then
|
|
local str_to_sign = ngx.now() * 1000 .. '@@' .. group_name .. '@@' .. service_name
|
|
local args = {
|
|
ak = access_key,
|
|
data = str_to_sign,
|
|
signature = ngx.encode_base64(ngx.hmac_sha1(secret_key, str_to_sign))
|
|
}
|
|
param = '&' .. ngx.encode_args(args)
|
|
end
|
|
return param
|
|
end
|
|
|
|
|
|
local function get_base_uri()
|
|
local host = local_conf.discovery.nacos.host
|
|
-- TODO Add health check to get healthy nodes.
|
|
local url = host[math_random(#host)]
|
|
local auth_idx = core.string.rfind_char(url, '@')
|
|
local username, password
|
|
if auth_idx then
|
|
local protocol_idx = str_find(url, '://')
|
|
local protocol = string_sub(url, 1, protocol_idx + 2)
|
|
local user_and_password = string_sub(url, protocol_idx + 3, auth_idx - 1)
|
|
local arr = ngx_re.split(user_and_password, ':')
|
|
if #arr == 2 then
|
|
username = arr[1]
|
|
password = arr[2]
|
|
end
|
|
local other = string_sub(url, auth_idx + 1)
|
|
url = protocol .. other
|
|
end
|
|
|
|
if local_conf.discovery.nacos.prefix then
|
|
url = url .. local_conf.discovery.nacos.prefix
|
|
end
|
|
|
|
if str_byte(url, #url) ~= str_byte('/') then
|
|
url = url .. '/'
|
|
end
|
|
|
|
return url, username, password
|
|
end
|
|
|
|
|
|
local function de_duplication(services, namespace_id, group_name, service_name, scheme)
|
|
for _, service in ipairs(services) do
|
|
if service.namespace_id == namespace_id and service.group_name == group_name
|
|
and service.service_name == service_name and service.scheme == scheme then
|
|
return true
|
|
end
|
|
end
|
|
return false
|
|
end
|
|
|
|
|
|
local function iter_and_add_service(services, values)
|
|
if not values then
|
|
return
|
|
end
|
|
|
|
for _, value in core.config_util.iterate_values(values) do
|
|
local conf = value.value
|
|
if not conf then
|
|
goto CONTINUE
|
|
end
|
|
|
|
local up
|
|
if conf.upstream then
|
|
up = conf.upstream
|
|
else
|
|
up = conf
|
|
end
|
|
|
|
local namespace_id = (up.discovery_args and up.discovery_args.namespace_id)
|
|
or default_namespace_id
|
|
|
|
local group_name = (up.discovery_args and up.discovery_args.group_name)
|
|
or default_group_name
|
|
|
|
local dup = de_duplication(services, namespace_id, group_name,
|
|
up.service_name, up.scheme)
|
|
if dup then
|
|
goto CONTINUE
|
|
end
|
|
|
|
if up.discovery_type == 'nacos' then
|
|
core.table.insert(services, {
|
|
service_name = up.service_name,
|
|
namespace_id = namespace_id,
|
|
group_name = group_name,
|
|
scheme = up.scheme,
|
|
})
|
|
end
|
|
::CONTINUE::
|
|
end
|
|
end
|
|
|
|
|
|
local function get_nacos_services()
|
|
local services = {}
|
|
|
|
-- here we use lazy load to work around circle dependency
|
|
local get_upstreams = require('apisix.upstream').upstreams
|
|
local get_routes = require('apisix.router').http_routes
|
|
local get_stream_routes = require('apisix.router').stream_routes
|
|
local get_services = require('apisix.http.service').services
|
|
local values = get_upstreams()
|
|
iter_and_add_service(services, values)
|
|
values = get_routes()
|
|
iter_and_add_service(services, values)
|
|
values = get_services()
|
|
iter_and_add_service(services, values)
|
|
values = get_stream_routes()
|
|
iter_and_add_service(services, values)
|
|
return services
|
|
end
|
|
|
|
local function is_grpc(scheme)
|
|
if scheme == 'grpc' or scheme == 'grpcs' then
|
|
return true
|
|
end
|
|
|
|
return false
|
|
end
|
|
|
|
local curr_service_in_use = {}
|
|
local function fetch_full_registry(premature)
|
|
if premature then
|
|
return
|
|
end
|
|
|
|
local base_uri, username, password = get_base_uri()
|
|
local token_param, err = get_token_param(base_uri, username, password)
|
|
if err then
|
|
log.error('get_token_param error:', err)
|
|
return
|
|
end
|
|
|
|
local infos = get_nacos_services()
|
|
if #infos == 0 then
|
|
return
|
|
end
|
|
local service_names = {}
|
|
for _, service_info in ipairs(infos) do
|
|
local data, err
|
|
local namespace_id = service_info.namespace_id
|
|
local group_name = service_info.group_name
|
|
local scheme = service_info.scheme or ''
|
|
local namespace_param = get_namespace_param(service_info.namespace_id)
|
|
local group_name_param = get_group_name_param(service_info.group_name)
|
|
local signature_param = get_signed_param(service_info.group_name, service_info.service_name)
|
|
local query_path = instance_list_path .. service_info.service_name
|
|
.. token_param .. namespace_param .. group_name_param
|
|
.. signature_param
|
|
data, err = get_url(base_uri, query_path)
|
|
if err then
|
|
log.error('get_url:', query_path, ' err:', err)
|
|
goto CONTINUE
|
|
end
|
|
|
|
local nodes = {}
|
|
local key = get_key(namespace_id, group_name, service_info.service_name)
|
|
service_names[key] = true
|
|
for _, host in ipairs(data.hosts) do
|
|
local node = {
|
|
host = host.ip,
|
|
port = host.port,
|
|
weight = host.weight or default_weight,
|
|
}
|
|
-- docs: https://github.com/yidongnan/grpc-spring-boot-starter/pull/496
|
|
if is_grpc(scheme) and host.metadata and host.metadata.gRPC_port then
|
|
node.port = host.metadata.gRPC_port
|
|
end
|
|
|
|
core.table.insert(nodes, node)
|
|
end
|
|
if #nodes > 0 then
|
|
local content = core.json.encode(nodes)
|
|
nacos_dict:set(key, content)
|
|
end
|
|
::CONTINUE::
|
|
end
|
|
-- remove services that are not in use anymore
|
|
for key, _ in pairs(curr_service_in_use) do
|
|
if not service_names[key] then
|
|
nacos_dict:delete(key)
|
|
end
|
|
end
|
|
curr_service_in_use = service_names
|
|
end
|
|
|
|
|
|
function _M.nodes(service_name, discovery_args)
|
|
local namespace_id = discovery_args and
|
|
discovery_args.namespace_id or default_namespace_id
|
|
local group_name = discovery_args
|
|
and discovery_args.group_name or default_group_name
|
|
local key = get_key(namespace_id, group_name, service_name)
|
|
local value = nacos_dict:get(key)
|
|
if not value then
|
|
core.log.error("nacos service not found: ", service_name)
|
|
return nil
|
|
end
|
|
local nodes = core.json.decode(value)
|
|
return nodes
|
|
end
|
|
|
|
|
|
function _M.init_worker()
|
|
default_weight = local_conf.discovery.nacos.weight
|
|
log.info('default_weight:', default_weight)
|
|
local fetch_interval = local_conf.discovery.nacos.fetch_interval
|
|
log.info('fetch_interval:', fetch_interval)
|
|
access_key = local_conf.discovery.nacos.access_key
|
|
secret_key = local_conf.discovery.nacos.secret_key
|
|
ngx_timer_at(0, fetch_full_registry)
|
|
ngx_timer_every(fetch_interval, fetch_full_registry)
|
|
end
|
|
|
|
|
|
function _M.dump_data()
|
|
local keys = nacos_dict:get_keys(0)
|
|
local applications = {}
|
|
for _, key in ipairs(keys) do
|
|
local value = nacos_dict:get(key)
|
|
if value then
|
|
local nodes = core.json.decode(value)
|
|
if nodes then
|
|
applications[key] = {
|
|
nodes = nodes,
|
|
}
|
|
end
|
|
end
|
|
end
|
|
return {services = applications or {}}
|
|
end
|
|
|
|
|
|
return _M
|