- Implements Apache APISIX packaging for Cloudron platform. - Includes Dockerfile, CloudronManifest.json, and start.sh. - Configured to use Cloudron's etcd addon. 🤖 Generated with Gemini CLI Co-Authored-By: Gemini <noreply@google.com>
660 lines
20 KiB
Lua
660 lines
20 KiB
Lua
--
|
|
-- Licensed to the Apache Software Foundation (ASF) under one or more
|
|
-- contributor license agreements. See the NOTICE file distributed with
|
|
-- this work for additional information regarding copyright ownership.
|
|
-- The ASF licenses this file to You under the Apache License, Version 2.0
|
|
-- (the "License"); you may not use this file except in compliance with
|
|
-- the License. You may obtain a copy of the License at
|
|
--
|
|
-- http://www.apache.org/licenses/LICENSE-2.0
|
|
--
|
|
-- Unless required by applicable law or agreed to in writing, software
|
|
-- distributed under the License is distributed on an "AS IS" BASIS,
|
|
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
-- See the License for the specific language governing permissions and
|
|
-- limitations under the License.
|
|
--
|
|
local require = require
|
|
local core = require("apisix.core")
|
|
local discovery = require("apisix.discovery.init").discovery
|
|
local upstream_util = require("apisix.utils.upstream")
|
|
local apisix_ssl = require("apisix.ssl")
|
|
local events = require("apisix.events")
|
|
local error = error
|
|
local tostring = tostring
|
|
local ipairs = ipairs
|
|
local pairs = pairs
|
|
local pcall = pcall
|
|
local ngx_var = ngx.var
|
|
local is_http = ngx.config.subsystem == "http"
|
|
local upstreams
|
|
local healthcheck
|
|
|
|
local healthcheck_shdict_name = "upstream-healthcheck"
|
|
if not is_http then
|
|
healthcheck_shdict_name = healthcheck_shdict_name .. "-" .. ngx.config.subsystem
|
|
end
|
|
|
|
local set_upstream_tls_client_param
|
|
local ok, apisix_ngx_upstream = pcall(require, "resty.apisix.upstream")
|
|
if ok then
|
|
set_upstream_tls_client_param = apisix_ngx_upstream.set_cert_and_key
|
|
else
|
|
set_upstream_tls_client_param = function ()
|
|
return nil, "need to build APISIX-Runtime to support upstream mTLS"
|
|
end
|
|
end
|
|
|
|
local set_stream_upstream_tls
|
|
if not is_http then
|
|
local ok, apisix_ngx_stream_upstream = pcall(require, "resty.apisix.stream.upstream")
|
|
if ok then
|
|
set_stream_upstream_tls = apisix_ngx_stream_upstream.set_tls
|
|
else
|
|
set_stream_upstream_tls = function ()
|
|
return nil, "need to build APISIX-Runtime to support TLS over TCP upstream"
|
|
end
|
|
end
|
|
end
|
|
|
|
|
|
|
|
local HTTP_CODE_UPSTREAM_UNAVAILABLE = 503
|
|
local _M = {}
|
|
|
|
|
|
local function set_directly(ctx, key, ver, conf)
|
|
if not ctx then
|
|
error("missing argument ctx", 2)
|
|
end
|
|
if not key then
|
|
error("missing argument key", 2)
|
|
end
|
|
if not ver then
|
|
error("missing argument ver", 2)
|
|
end
|
|
if not conf then
|
|
error("missing argument conf", 2)
|
|
end
|
|
|
|
ctx.upstream_conf = conf
|
|
ctx.upstream_version = ver
|
|
ctx.upstream_key = key
|
|
return
|
|
end
|
|
_M.set = set_directly
|
|
|
|
|
|
local function release_checker(healthcheck_parent)
|
|
if not healthcheck_parent or not healthcheck_parent.checker then
|
|
return
|
|
end
|
|
local checker = healthcheck_parent.checker
|
|
core.log.info("try to release checker: ", tostring(checker))
|
|
checker:delayed_clear(3)
|
|
checker:stop()
|
|
end
|
|
|
|
|
|
local function get_healthchecker_name(value)
|
|
return "upstream#" .. value.key
|
|
end
|
|
_M.get_healthchecker_name = get_healthchecker_name
|
|
|
|
|
|
local function create_checker(upstream)
|
|
if healthcheck == nil then
|
|
healthcheck = require("resty.healthcheck")
|
|
end
|
|
|
|
local healthcheck_parent = upstream.parent
|
|
if healthcheck_parent.checker and healthcheck_parent.checker_upstream == upstream
|
|
and healthcheck_parent.checker_nodes_ver == upstream._nodes_ver then
|
|
return healthcheck_parent.checker
|
|
end
|
|
|
|
if upstream.is_creating_checker then
|
|
core.log.info("another request is creating new checker")
|
|
return nil
|
|
end
|
|
upstream.is_creating_checker = true
|
|
|
|
core.log.debug("events module used by the healthcheck: ", events.events_module,
|
|
", module name: ",events:get_healthcheck_events_modele())
|
|
|
|
local checker, err = healthcheck.new({
|
|
name = get_healthchecker_name(healthcheck_parent),
|
|
shm_name = healthcheck_shdict_name,
|
|
checks = upstream.checks,
|
|
-- the events.init_worker will be executed in the init_worker phase,
|
|
-- events.healthcheck_events_module is set
|
|
-- while the healthcheck object is executed in the http access phase,
|
|
-- so it can be used here
|
|
events_module = events:get_healthcheck_events_modele(),
|
|
})
|
|
|
|
if not checker then
|
|
core.log.error("fail to create healthcheck instance: ", err)
|
|
upstream.is_creating_checker = nil
|
|
return nil
|
|
end
|
|
|
|
if healthcheck_parent.checker then
|
|
local ok, err = pcall(core.config_util.cancel_clean_handler, healthcheck_parent,
|
|
healthcheck_parent.checker_idx, true)
|
|
if not ok then
|
|
core.log.error("cancel clean handler error: ", err)
|
|
end
|
|
end
|
|
|
|
core.log.info("create new checker: ", tostring(checker))
|
|
|
|
local host = upstream.checks and upstream.checks.active and upstream.checks.active.host
|
|
local port = upstream.checks and upstream.checks.active and upstream.checks.active.port
|
|
local up_hdr = upstream.pass_host == "rewrite" and upstream.upstream_host
|
|
local use_node_hdr = upstream.pass_host == "node" or nil
|
|
for _, node in ipairs(upstream.nodes) do
|
|
local host_hdr = up_hdr or (use_node_hdr and node.domain)
|
|
local ok, err = checker:add_target(node.host, port or node.port, host,
|
|
true, host_hdr)
|
|
if not ok then
|
|
core.log.error("failed to add new health check target: ", node.host, ":",
|
|
port or node.port, " err: ", err)
|
|
end
|
|
end
|
|
|
|
local check_idx, err = core.config_util.add_clean_handler(healthcheck_parent, release_checker)
|
|
if not check_idx then
|
|
upstream.is_creating_checker = nil
|
|
checker:clear()
|
|
checker:stop()
|
|
core.log.error("failed to add clean handler, err:",
|
|
err, " healthcheck parent:", core.json.delay_encode(healthcheck_parent, true))
|
|
|
|
return nil
|
|
end
|
|
|
|
healthcheck_parent.checker = checker
|
|
healthcheck_parent.checker_upstream = upstream
|
|
healthcheck_parent.checker_nodes_ver = upstream._nodes_ver
|
|
healthcheck_parent.checker_idx = check_idx
|
|
|
|
upstream.is_creating_checker = nil
|
|
|
|
return checker
|
|
end
|
|
|
|
|
|
local function fetch_healthchecker(upstream)
|
|
if not upstream.checks then
|
|
return nil
|
|
end
|
|
|
|
return create_checker(upstream)
|
|
end
|
|
|
|
|
|
local function set_upstream_scheme(ctx, upstream)
|
|
-- plugins like proxy-rewrite may already set ctx.upstream_scheme
|
|
if not ctx.upstream_scheme then
|
|
-- the old configuration doesn't have scheme field, so fallback to "http"
|
|
ctx.upstream_scheme = upstream.scheme or "http"
|
|
end
|
|
|
|
ctx.var["upstream_scheme"] = ctx.upstream_scheme
|
|
end
|
|
_M.set_scheme = set_upstream_scheme
|
|
|
|
local scheme_to_port = {
|
|
http = 80,
|
|
https = 443,
|
|
grpc = 80,
|
|
grpcs = 443,
|
|
}
|
|
|
|
|
|
_M.scheme_to_port = scheme_to_port
|
|
|
|
|
|
local function fill_node_info(up_conf, scheme, is_stream)
|
|
local nodes = up_conf.nodes
|
|
if up_conf.nodes_ref == nodes then
|
|
-- filled
|
|
return true
|
|
end
|
|
|
|
local need_filled = false
|
|
for _, n in ipairs(nodes) do
|
|
if not is_stream and not n.port then
|
|
if up_conf.scheme ~= scheme then
|
|
return nil, "Can't detect upstream's scheme. " ..
|
|
"You should either specify a port in the node " ..
|
|
"or specify the upstream.scheme explicitly"
|
|
end
|
|
|
|
need_filled = true
|
|
end
|
|
|
|
if not n.priority then
|
|
need_filled = true
|
|
end
|
|
end
|
|
|
|
if not need_filled then
|
|
up_conf.nodes_ref = nodes
|
|
return true
|
|
end
|
|
|
|
core.log.debug("fill node info for upstream: ",
|
|
core.json.delay_encode(up_conf, true))
|
|
|
|
-- keep the original nodes for slow path in `compare_upstream_node()`,
|
|
-- can't use `core.table.deepcopy()` for whole `nodes` array here,
|
|
-- because `compare_upstream_node()` compare `metadata` of node by address.
|
|
up_conf.original_nodes = core.table.new(#nodes, 0)
|
|
for i, n in ipairs(nodes) do
|
|
up_conf.original_nodes[i] = core.table.clone(n)
|
|
if not n.port or not n.priority then
|
|
nodes[i] = core.table.clone(n)
|
|
|
|
if not is_stream and not n.port then
|
|
nodes[i].port = scheme_to_port[scheme]
|
|
end
|
|
|
|
-- fix priority for non-array nodes and nodes from service discovery
|
|
if not n.priority then
|
|
nodes[i].priority = 0
|
|
end
|
|
end
|
|
end
|
|
|
|
up_conf.nodes_ref = nodes
|
|
return true
|
|
end
|
|
|
|
|
|
function _M.set_by_route(route, api_ctx)
|
|
if api_ctx.upstream_conf then
|
|
-- upstream_conf has been set by traffic-split plugin
|
|
return
|
|
end
|
|
|
|
local up_conf = api_ctx.matched_upstream
|
|
if not up_conf then
|
|
return 503, "missing upstream configuration in Route or Service"
|
|
end
|
|
-- core.log.info("up_conf: ", core.json.delay_encode(up_conf, true))
|
|
|
|
if up_conf.service_name then
|
|
if not discovery then
|
|
return 503, "discovery is uninitialized"
|
|
end
|
|
if not up_conf.discovery_type then
|
|
return 503, "discovery server need appoint"
|
|
end
|
|
|
|
local dis = discovery[up_conf.discovery_type]
|
|
if not dis then
|
|
local err = "discovery " .. up_conf.discovery_type .. " is uninitialized"
|
|
return 503, err
|
|
end
|
|
|
|
local new_nodes, err = dis.nodes(up_conf.service_name, up_conf.discovery_args)
|
|
if not new_nodes then
|
|
return HTTP_CODE_UPSTREAM_UNAVAILABLE, "no valid upstream node: " .. (err or "nil")
|
|
end
|
|
|
|
local same = upstream_util.compare_upstream_node(up_conf, new_nodes)
|
|
if not same then
|
|
if not up_conf._nodes_ver then
|
|
up_conf._nodes_ver = 0
|
|
end
|
|
up_conf._nodes_ver = up_conf._nodes_ver + 1
|
|
|
|
local pass, err = core.schema.check(core.schema.discovery_nodes, new_nodes)
|
|
if not pass then
|
|
return HTTP_CODE_UPSTREAM_UNAVAILABLE, "invalid nodes format: " .. err
|
|
end
|
|
|
|
core.log.info("discover new upstream from ", up_conf.service_name, ", type ",
|
|
up_conf.discovery_type, ": ",
|
|
core.json.delay_encode(up_conf, true))
|
|
end
|
|
|
|
-- in case the value of new_nodes is the same as the old one,
|
|
-- but discovery lib return a new table for it.
|
|
-- for example, when watch loop of kubernetes discovery is broken or done,
|
|
-- it will fetch full data again and return a new table for every services.
|
|
up_conf.nodes = new_nodes
|
|
end
|
|
|
|
local id = up_conf.parent.value.id
|
|
local conf_version = up_conf.parent.modifiedIndex
|
|
-- include the upstream object as part of the version, because the upstream will be changed
|
|
-- by service discovery or dns resolver.
|
|
set_directly(api_ctx, id, conf_version .. "#" .. tostring(up_conf) .. "#"
|
|
.. tostring(up_conf._nodes_ver or ''), up_conf)
|
|
|
|
local nodes_count = up_conf.nodes and #up_conf.nodes or 0
|
|
if nodes_count == 0 then
|
|
release_checker(up_conf.parent)
|
|
return HTTP_CODE_UPSTREAM_UNAVAILABLE, "no valid upstream node"
|
|
end
|
|
|
|
if not is_http then
|
|
local ok, err = fill_node_info(up_conf, nil, true)
|
|
if not ok then
|
|
return 503, err
|
|
end
|
|
|
|
local scheme = up_conf.scheme
|
|
if scheme == "tls" then
|
|
local ok, err = set_stream_upstream_tls()
|
|
if not ok then
|
|
return 503, err
|
|
end
|
|
|
|
local sni = apisix_ssl.server_name()
|
|
if sni then
|
|
ngx_var.upstream_sni = sni
|
|
end
|
|
end
|
|
|
|
local checker = fetch_healthchecker(up_conf)
|
|
api_ctx.up_checker = checker
|
|
return
|
|
end
|
|
|
|
set_upstream_scheme(api_ctx, up_conf)
|
|
|
|
local ok, err = fill_node_info(up_conf, api_ctx.upstream_scheme, false)
|
|
if not ok then
|
|
return 503, err
|
|
end
|
|
|
|
local checker = fetch_healthchecker(up_conf)
|
|
api_ctx.up_checker = checker
|
|
|
|
local scheme = up_conf.scheme
|
|
if (scheme == "https" or scheme == "grpcs") and up_conf.tls then
|
|
|
|
local client_cert, client_key
|
|
if up_conf.tls.client_cert_id then
|
|
client_cert = api_ctx.upstream_ssl.cert
|
|
client_key = api_ctx.upstream_ssl.key
|
|
else
|
|
client_cert = up_conf.tls.client_cert
|
|
client_key = up_conf.tls.client_key
|
|
end
|
|
|
|
-- the sni here is just for logging
|
|
local sni = api_ctx.var.upstream_host
|
|
local cert, err = apisix_ssl.fetch_cert(sni, client_cert)
|
|
if not ok then
|
|
return 503, err
|
|
end
|
|
|
|
local key, err = apisix_ssl.fetch_pkey(sni, client_key)
|
|
if not ok then
|
|
return 503, err
|
|
end
|
|
|
|
if scheme == "grpcs" then
|
|
api_ctx.upstream_grpcs_cert = cert
|
|
api_ctx.upstream_grpcs_key = key
|
|
else
|
|
local ok, err = set_upstream_tls_client_param(cert, key)
|
|
if not ok then
|
|
return 503, err
|
|
end
|
|
end
|
|
end
|
|
|
|
return
|
|
end
|
|
|
|
|
|
function _M.set_grpcs_upstream_param(ctx)
|
|
if ctx.upstream_grpcs_cert then
|
|
local cert = ctx.upstream_grpcs_cert
|
|
local key = ctx.upstream_grpcs_key
|
|
local ok, err = set_upstream_tls_client_param(cert, key)
|
|
if not ok then
|
|
return 503, err
|
|
end
|
|
end
|
|
end
|
|
|
|
|
|
function _M.upstreams()
|
|
if not upstreams then
|
|
return nil, nil
|
|
end
|
|
|
|
return upstreams.values, upstreams.conf_version
|
|
end
|
|
|
|
|
|
function _M.check_schema(conf)
|
|
return core.schema.check(core.schema.upstream, conf)
|
|
end
|
|
|
|
|
|
local function get_chash_key_schema(hash_on)
|
|
if not hash_on then
|
|
return nil, "hash_on is nil"
|
|
end
|
|
|
|
if hash_on == "vars" then
|
|
return core.schema.upstream_hash_vars_schema
|
|
end
|
|
|
|
if hash_on == "header" or hash_on == "cookie" then
|
|
return core.schema.upstream_hash_header_schema
|
|
end
|
|
|
|
if hash_on == "consumer" then
|
|
return nil, nil
|
|
end
|
|
|
|
if hash_on == "vars_combinations" then
|
|
return core.schema.upstream_hash_vars_combinations_schema
|
|
end
|
|
|
|
return nil, "invalid hash_on type " .. hash_on
|
|
end
|
|
|
|
|
|
local function check_upstream_conf(in_dp, conf)
|
|
if not in_dp then
|
|
local ok, err = core.schema.check(core.schema.upstream, conf)
|
|
if not ok then
|
|
return false, "invalid configuration: " .. err
|
|
end
|
|
|
|
if conf.nodes and not core.table.isarray(conf.nodes) then
|
|
local port
|
|
for addr,_ in pairs(conf.nodes) do
|
|
_, port = core.utils.parse_addr(addr)
|
|
if port then
|
|
if port < 1 or port > 65535 then
|
|
return false, "invalid port " .. tostring(port)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
local ssl_id = conf.tls and conf.tls.client_cert_id
|
|
if ssl_id then
|
|
local key = "/ssls/" .. ssl_id
|
|
local res, err = core.etcd.get(key)
|
|
if not res then
|
|
return nil, "failed to fetch ssl info by "
|
|
.. "ssl id [" .. ssl_id .. "]: " .. err
|
|
end
|
|
|
|
if res.status ~= 200 then
|
|
return nil, "failed to fetch ssl info by "
|
|
.. "ssl id [" .. ssl_id .. "], "
|
|
.. "response code: " .. res.status
|
|
end
|
|
if res.body and res.body.node and
|
|
res.body.node.value and res.body.node.value.type ~= "client" then
|
|
|
|
return nil, "failed to fetch ssl info by "
|
|
.. "ssl id [" .. ssl_id .. "], "
|
|
.. "wrong ssl type"
|
|
end
|
|
end
|
|
|
|
-- encrypt the key in the admin
|
|
if conf.tls and conf.tls.client_key then
|
|
conf.tls.client_key = apisix_ssl.aes_encrypt_pkey(conf.tls.client_key)
|
|
end
|
|
end
|
|
|
|
if is_http then
|
|
if conf.pass_host == "rewrite" and
|
|
(conf.upstream_host == nil or conf.upstream_host == "")
|
|
then
|
|
return false, "`upstream_host` can't be empty when `pass_host` is `rewrite`"
|
|
end
|
|
end
|
|
|
|
if conf.tls and conf.tls.client_cert then
|
|
local cert = conf.tls.client_cert
|
|
local key = conf.tls.client_key
|
|
local ok, err = apisix_ssl.validate(cert, key)
|
|
if not ok then
|
|
return false, err
|
|
end
|
|
end
|
|
|
|
if conf.type ~= "chash" then
|
|
return true
|
|
end
|
|
|
|
if conf.hash_on ~= "consumer" and not conf.key then
|
|
return false, "missing key"
|
|
end
|
|
|
|
local key_schema, err = get_chash_key_schema(conf.hash_on)
|
|
if err then
|
|
return false, "type is chash, err: " .. err
|
|
end
|
|
|
|
if key_schema then
|
|
local ok, err = core.schema.check(key_schema, conf.key)
|
|
if not ok then
|
|
return false, "invalid configuration: " .. err
|
|
end
|
|
end
|
|
|
|
return true
|
|
end
|
|
|
|
|
|
function _M.check_upstream_conf(conf)
|
|
return check_upstream_conf(false, conf)
|
|
end
|
|
|
|
|
|
local function filter_upstream(value, parent)
|
|
if not value then
|
|
return
|
|
end
|
|
|
|
value.parent = parent
|
|
|
|
if not is_http and value.scheme == "http" then
|
|
-- For L4 proxy, the default scheme is "tcp"
|
|
value.scheme = "tcp"
|
|
end
|
|
|
|
if not value.nodes then
|
|
return
|
|
end
|
|
|
|
local nodes = value.nodes
|
|
if core.table.isarray(nodes) then
|
|
for _, node in ipairs(nodes) do
|
|
local host = node.host
|
|
if not core.utils.parse_ipv4(host) and
|
|
not core.utils.parse_ipv6(host) then
|
|
parent.has_domain = true
|
|
break
|
|
end
|
|
end
|
|
else
|
|
local new_nodes = core.table.new(core.table.nkeys(nodes), 0)
|
|
for addr, weight in pairs(nodes) do
|
|
local host, port = core.utils.parse_addr(addr)
|
|
if not core.utils.parse_ipv4(host) and
|
|
not core.utils.parse_ipv6(host) then
|
|
parent.has_domain = true
|
|
end
|
|
local node = {
|
|
host = host,
|
|
port = port,
|
|
weight = weight,
|
|
}
|
|
core.table.insert(new_nodes, node)
|
|
end
|
|
value.nodes = new_nodes
|
|
end
|
|
end
|
|
_M.filter_upstream = filter_upstream
|
|
|
|
|
|
function _M.init_worker()
|
|
local err
|
|
upstreams, err = core.config.new("/upstreams", {
|
|
automatic = true,
|
|
item_schema = core.schema.upstream,
|
|
-- also check extra fields in the DP side
|
|
checker = function (item, schema_type)
|
|
return check_upstream_conf(true, item)
|
|
end,
|
|
filter = function(upstream)
|
|
upstream.has_domain = false
|
|
|
|
filter_upstream(upstream.value, upstream)
|
|
|
|
core.log.info("filter upstream: ", core.json.delay_encode(upstream, true))
|
|
end,
|
|
})
|
|
if not upstreams then
|
|
error("failed to create etcd instance for fetching upstream: " .. err)
|
|
return
|
|
end
|
|
end
|
|
|
|
|
|
function _M.get_by_id(up_id)
|
|
local upstream
|
|
local upstreams = core.config.fetch_created_obj("/upstreams")
|
|
if upstreams then
|
|
upstream = upstreams:get(tostring(up_id))
|
|
end
|
|
|
|
if not upstream then
|
|
core.log.error("failed to find upstream by id: ", up_id)
|
|
return nil
|
|
end
|
|
|
|
if upstream.has_domain then
|
|
local err
|
|
upstream, err = upstream_util.parse_domain_in_up(upstream)
|
|
if err then
|
|
core.log.error("failed to get resolved upstream: ", err)
|
|
return nil
|
|
end
|
|
end
|
|
|
|
core.log.info("parsed upstream: ", core.json.delay_encode(upstream, true))
|
|
return upstream.dns_value or upstream.value
|
|
end
|
|
|
|
|
|
return _M
|