Files
ReachableCEO 54cc5f7308 feat(apisix): add Cloudron package
- Implements Apache APISIX packaging for Cloudron platform.
- Includes Dockerfile, CloudronManifest.json, and start.sh.
- Configured to use Cloudron's etcd addon.

🤖 Generated with Gemini CLI
Co-Authored-By: Gemini <noreply@google.com>
2025-09-04 09:42:47 -05:00

368 lines
11 KiB
Lua

--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local ngx = ngx
local format = string.format
local ipairs = ipairs
local error = error
local tonumber = tonumber
local local_conf = require("apisix.core.config_local").local_conf()
local core = require("apisix.core")
local mysql = require("resty.mysql")
local is_http = ngx.config.subsystem == "http"
local process = require("ngx.process")
local endpoint_dict
local full_query_sql = [[ select servant, group_concat(endpoint order by endpoint) as endpoints
from t_server_conf left join t_adapter_conf tac using (application, server_name, node_name)
where setting_state = 'active' and present_state = 'active'
group by servant ]]
local incremental_query_sql = [[
select servant, (setting_state = 'active' and present_state = 'active') activated,
group_concat(endpoint order by endpoint) endpoints
from t_server_conf left join t_adapter_conf tac using (application, server_name, node_name)
where (application, server_name) in
(
select application, server_name from t_server_conf
where registry_timestamp > now() - interval %d second
union
select application, server_name from t_adapter_conf
where registry_timestamp > now() - interval %d second
)
group by servant, activated order by activated desc ]]
local _M = {
version = 0.1,
}
local default_weight
local last_fetch_full_time = 0
local last_db_error
local endpoint_lrucache = core.lrucache.new({
ttl = 300,
count = 1024
})
local activated_buffer = core.table.new(10, 0)
local nodes_buffer = core.table.new(0, 5)
--[[
endpoints format as follows:
tcp -h 172.16.1.1 -p 11 -t 6000 -e 0,tcp -e 0 -p 12 -h 172.16.1.1,tcp -p 13 -h 172.16.1.1
we extract host and port value via endpoints_pattern
--]]
local endpoints_pattern = core.table.concat(
{ [[tcp(\s*-[te]\s*(\S+)){0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+))]],
[[{0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+)){0,2}\s*(,|$)]] }
)
local function update_endpoint(servant, nodes)
local endpoint_content = core.json.encode(nodes, true)
local endpoint_version = ngx.crc32_long(endpoint_content)
core.log.debug("set servant ", servant, endpoint_content)
local _, err
_, err = endpoint_dict:safe_set(servant .. "#version", endpoint_version)
if err then
core.log.error("set endpoint version into nginx shared dict failed, ", err)
return
end
_, err = endpoint_dict:safe_set(servant, endpoint_content)
if err then
core.log.error("set endpoint into nginx shared dict failed, ", err)
endpoint_dict:delete(servant .. "#version")
end
end
local function delete_endpoint(servant)
core.log.info("delete servant ", servant)
endpoint_dict:delete(servant .. "#version")
endpoint_dict:delete(servant)
end
local function add_endpoint_to_lrucache(servant)
local endpoint_content, err = endpoint_dict:get_stale(servant)
if not endpoint_content then
core.log.error("get empty endpoint content, servant: ", servant, ", err: ", err)
return nil
end
local endpoint, err = core.json.decode(endpoint_content)
if not endpoint then
core.log.error("decode json failed, content: ", endpoint_content, ", err: ", err)
return nil
end
return endpoint
end
local function get_endpoint(servant)
--[[
fetch_full function will:
1: call endpoint_dict:flush_all()
2: setup servant:nodes pairs into endpoint_dict
3: call endpoint_dict:flush_expired()
get_endpoint may be called during the 2 step of the fetch_full function,
so we must use endpoint_dict:get_stale() to get value instead endpoint_dict:get()
--]]
local endpoint_version, err = endpoint_dict:get_stale(servant .. "#version")
if not endpoint_version then
if err then
core.log.error("get empty endpoint version, servant: ", servant, ", err: ", err)
end
return nil
end
return endpoint_lrucache(servant, endpoint_version, add_endpoint_to_lrucache, servant)
end
local function extract_endpoint(query_result)
for _, p in ipairs(query_result) do
repeat
local servant = p.servant
if servant == ngx.null then
break
end
if p.activated == 1 then
activated_buffer[servant] = ngx.null
elseif p.activated == 0 then
if activated_buffer[servant] == nil then
delete_endpoint(servant)
end
break
end
core.table.clear(nodes_buffer)
local iterator = ngx.re.gmatch(p.endpoints, endpoints_pattern, "jao")
while true do
local captures, err = iterator()
if err then
core.log.error("gmatch failed, error: ", err, " , endpoints: ", p.endpoints)
break
end
if not captures then
break
end
local host, port
if captures[3] == "h" or captures[3] == "H" then
host = captures[4]
port = tonumber(captures[8])
else
host = captures[8]
port = tonumber(captures[4])
end
core.table.insert(nodes_buffer, {
host = host,
port = port,
weight = default_weight,
})
end
update_endpoint(servant, nodes_buffer)
until true
end
end
local function fetch_full(db_cli)
local res, err, errcode, sqlstate = db_cli:query(full_query_sql)
--[[
res format is as follows:
{
{
servant = "A.AServer.FirstObj",
endpoints = "tcp -h 172.16.1.1 -p 10001 -e 0 -t 3000,tcp -p 10002 -h 172.16.1.2 -t 3000"
},
{
servant = "A.AServer.SecondObj",
endpoints = "tcp -t 3000 -p 10002 -h 172.16.1.2"
},
}
if current endpoint_dict is as follows:
key1:nodes1, key2:nodes2, key3:nodes3
then fetch_full get follow results:
key1:nodes1, key4:nodes4, key5:nodes5
at this time, we need
1: setup key4:nodes4, key5:nodes5
2: delete key2:nodes2, key3:nodes3
to achieve goals, we should:
1: before setup results, execute endpoint_dict:flush_all()
2: after setup results, execute endpoint_dict:flush_expired()
--]]
if not res then
core.log.error("query failed, error: ", err, ", ", errcode, " ", sqlstate)
return err
end
endpoint_dict:flush_all()
extract_endpoint(res)
while err == "again" do
res, err, errcode, sqlstate = db_cli:read_result()
if not res then
if err then
core.log.error("read result failed, error: ", err, ", ", errcode, " ", sqlstate)
end
return err
end
extract_endpoint(res)
end
endpoint_dict:flush_expired()
return nil
end
local function fetch_incremental(db_cli)
local res, err, errcode, sqlstate = db_cli:query(incremental_query_sql)
--[[
res is as follows:
{
{
activated=1,
servant = "A.AServer.FirstObj",
endpoints = "tcp -h 172.16.1.1 -p 10001 -e 0 -t 3000,tcp -p 10002 -h 172.16.1.2 -t 3000"
},
{
activated=0,
servant = "A.AServer.FirstObj",
endpoints = "tcp -t 3000 -p 10001 -h 172.16.1.3"
},
{
activated=0,
servant = "B.BServer.FirstObj",
endpoints = "tcp -t 3000 -p 10002 -h 172.16.1.2"
},
}
for each item:
if activated==1, setup
if activated==0, if there is a other item had same servant and activate==1, ignore
if activated==0, and there is no other item had same servant, delete
--]]
if not res then
core.log.error("query failed, error: ", err, ", ", errcode, " ", sqlstate)
return err
end
core.table.clear(activated_buffer)
extract_endpoint(res)
while err == "again" do
res, err, errcode, sqlstate = db_cli:read_result()
if not res then
if err then
core.log.error("read result failed, error: ", err, ", ", errcode, " ", sqlstate)
end
return err
end
extract_endpoint(res)
end
return nil
end
local function fetch_endpoint(premature, conf)
if premature then
return
end
local db_cli, err = mysql:new()
if not db_cli then
core.log.error("failed to instantiate mysql: ", err)
return
end
db_cli:set_timeout(3000)
local ok, err, errcode, sqlstate = db_cli:connect(conf.db_conf)
if not ok then
core.log.error("failed to connect mysql: ", err, ", ", errcode, ", ", sqlstate)
return
end
local now = ngx.time()
if last_db_error or last_fetch_full_time + conf.full_fetch_interval <= now then
last_fetch_full_time = now
last_db_error = fetch_full(db_cli)
else
last_db_error = fetch_incremental(db_cli)
end
if not last_db_error then
db_cli:set_keepalive(120 * 1000, 1)
end
end
function _M.nodes(servant)
return get_endpoint(servant)
end
local function get_endpoint_dict()
local shm = "tars"
if not is_http then
shm = shm .. "-stream"
end
return ngx.shared[shm]
end
function _M.init_worker()
endpoint_dict = get_endpoint_dict()
if not endpoint_dict then
error("failed to get lua_shared_dict: tars, please check your APISIX version")
end
if process.type() ~= "privileged agent" then
return
end
local conf = local_conf.discovery.tars
default_weight = conf.default_weight
core.log.info("conf ", core.json.delay_encode(conf))
local backtrack_time = conf.incremental_fetch_interval + 5
incremental_query_sql = format(incremental_query_sql, backtrack_time, backtrack_time)
ngx.timer.at(0, fetch_endpoint, conf)
ngx.timer.every(conf.incremental_fetch_interval, fetch_endpoint, conf)
end
return _M