Files
ReachableCEO 54cc5f7308 feat(apisix): add Cloudron package
- Implements Apache APISIX packaging for Cloudron platform.
- Includes Dockerfile, CloudronManifest.json, and start.sh.
- Configured to use Cloudron's etcd addon.

🤖 Generated with Gemini CLI
Co-Authored-By: Gemini <noreply@google.com>
2025-09-04 09:42:47 -05:00

280 lines
7.3 KiB
Lua

--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local require = require
local core = require("apisix.core")
local expr = require("resty.expr.v1")
local pairs = pairs
local ngx = ngx
local ngx_now = ngx.now
local OK = ngx.OK
local DECLINED = ngx.DECLINED
local DONE = ngx.DONE
local pcall = pcall
local ipairs = ipairs
local tostring = tostring
core.ctx.register_var("rpc_time", function(ctx)
return ctx._rpc_end_time - ctx._rpc_start_time
end)
local logger_expr_cache = core.lrucache.new({
ttl = 300, count = 1024
})
local _M = {}
local function open_session(conn_ctx)
conn_ctx.xrpc_session = {
conn_ctx = conn_ctx,
route = conn_ctx.matched_route.value,
-- fields start with '_' should not be accessed by the protocol implementation
_upstream_conf = conn_ctx.matched_upstream,
_ctxs = {},
}
return conn_ctx.xrpc_session
end
local function put_req_ctx(session, ctx)
local id = ctx._id
session._ctxs[id] = nil
core.ctx.release_vars(ctx)
core.tablepool.release("xrpc_ctxs", ctx)
end
local function filter_logger(ctx, logger)
if not logger then
return false
end
if not logger.filter or #logger.filter == 0 then
-- no valid filter, default execution plugin
return true
end
local version = tostring(logger.filter)
local filter_expr, err = logger_expr_cache(ctx.conf_id, version, expr.new, logger.filter)
if not filter_expr or err then
core.log.error("failed to validate the 'filter' expression: ", err)
return false
end
return filter_expr:eval(ctx.var)
end
local function run_log_plugin(ctx, logger)
local pkg_name = "apisix.stream.plugins." .. logger.name
local ok, plugin = pcall(require, pkg_name)
if not ok then
core.log.error("failed to load plugin [", logger.name, "] err: ", plugin)
return
end
local log_func = plugin.log
if log_func then
log_func(logger.conf, ctx)
end
end
local function finialize_req(protocol, session, ctx)
ctx._rpc_end_time = ngx_now()
local loggers = session.route.protocol.logger
if loggers and #loggers > 0 then
for _, logger in ipairs(loggers) do
ctx.conf_id = tostring(logger.conf)
local matched = filter_logger(ctx, logger)
core.log.info("log filter: ", logger.name, " filter result: ", matched)
if matched then
run_log_plugin(ctx, logger)
end
end
end
protocol.log(session, ctx)
put_req_ctx(session, ctx)
end
local function close_session(session, protocol)
local upstream_ctx = session._upstream_ctx
if upstream_ctx then
upstream_ctx.closed = true
local up = upstream_ctx.upstream
protocol.disconnect_upstream(session, up)
end
local upstream_ctxs = session._upstream_ctxs
if upstream_ctxs then
for _, upstream_ctx in pairs(upstream_ctxs) do
upstream_ctx.closed = true
local up = upstream_ctx.upstream
protocol.disconnect_upstream(session, up)
end
end
for id, ctx in pairs(session._ctxs) do
core.log.notice("RPC is not finished, id: ", id)
ctx.unfinished = true
finialize_req(protocol, session, ctx)
end
end
local function open_upstream(protocol, session, ctx)
local key = session._upstream_key
session._upstream_key = nil
if key then
if not session._upstream_ctxs then
session._upstream_ctxs = {}
end
local up_ctx = session._upstream_ctxs[key]
if up_ctx then
return OK, up_ctx
end
else
if session._upstream_ctx then
return OK, session._upstream_ctx
end
session.upstream_conf = session._upstream_conf
end
local state, upstream = protocol.connect_upstream(session, session)
if state ~= OK then
return state, nil
end
local up_ctx = {
upstream = upstream,
closed = false,
}
if key then
session._upstream_ctxs[key] = up_ctx
else
session._upstream_ctx = up_ctx
end
return OK, up_ctx
end
local function start_upstream_coroutine(session, protocol, downstream, up_ctx)
local upstream = up_ctx.upstream
while not up_ctx.closed do
local status, ctx = protocol.from_upstream(session, downstream, upstream)
if status ~= OK then
if ctx ~= nil then
finialize_req(protocol, session, ctx)
end
if status == DECLINED then
-- fail to read
break
end
if status == DONE then
-- a rpc is finished
goto continue
end
end
::continue::
end
end
function _M.run(protocol, conn_ctx)
local session = open_session(conn_ctx)
local downstream = protocol.init_downstream(session)
while true do
local status, ctx = protocol.from_downstream(session, downstream)
if status ~= OK then
if ctx ~= nil then
finialize_req(protocol, session, ctx)
end
if status == DECLINED then
-- fail to read or can't be authorized
break
end
if status == DONE then
-- heartbeat or fault injection, already reply to downstream
goto continue
end
end
-- need to do some auth/routing jobs before reaching upstream
local status, up_ctx = open_upstream(protocol, session, ctx)
if status ~= OK then
if ctx ~= nil then
finialize_req(protocol, session, ctx)
end
break
end
status = protocol.to_upstream(session, ctx, downstream, up_ctx.upstream)
if status ~= OK then
if ctx ~= nil then
finialize_req(protocol, session, ctx)
end
if status == DECLINED then
break
end
if status == DONE then
-- for Unary request we can directly reply here
goto continue
end
end
if not up_ctx.coroutine then
local co, err = ngx.thread.spawn(
start_upstream_coroutine, session, protocol, downstream, up_ctx)
if not co then
core.log.error("failed to start upstream coroutine: ", err)
break
end
up_ctx.coroutine = co
end
::continue::
end
close_session(session, protocol)
-- return non-zero code to terminal the session
return 200
end
return _M