- Implements Apache APISIX packaging for Cloudron platform. - Includes Dockerfile, CloudronManifest.json, and start.sh. - Configured to use Cloudron's etcd addon. 🤖 Generated with Gemini CLI Co-Authored-By: Gemini <noreply@google.com>
1286 lines
40 KiB
Lua
1286 lines
40 KiB
Lua
--
|
|
-- Licensed to the Apache Software Foundation (ASF) under one or more
|
|
-- contributor license agreements. See the NOTICE file distributed with
|
|
-- this work for additional information regarding copyright ownership.
|
|
-- The ASF licenses this file to You under the Apache License, Version 2.0
|
|
-- (the "License"); you may not use this file except in compliance with
|
|
-- the License. You may obtain a copy of the License at
|
|
--
|
|
-- http://www.apache.org/licenses/LICENSE-2.0
|
|
--
|
|
-- Unless required by applicable law or agreed to in writing, software
|
|
-- distributed under the License is distributed on an "AS IS" BASIS,
|
|
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
-- See the License for the specific language governing permissions and
|
|
-- limitations under the License.
|
|
--
|
|
local require = require
|
|
local core = require("apisix.core")
|
|
local config_util = require("apisix.core.config_util")
|
|
local enable_debug = require("apisix.debug").enable_debug
|
|
local wasm = require("apisix.wasm")
|
|
local expr = require("resty.expr.v1")
|
|
local apisix_ssl = require("apisix.ssl")
|
|
local re_split = require("ngx.re").split
|
|
local ngx = ngx
|
|
local crc32 = ngx.crc32_short
|
|
local ngx_exit = ngx.exit
|
|
local pkg_loaded = package.loaded
|
|
local sort_tab = table.sort
|
|
local pcall = pcall
|
|
local ipairs = ipairs
|
|
local pairs = pairs
|
|
local type = type
|
|
local local_plugins = core.table.new(32, 0)
|
|
local tostring = tostring
|
|
local error = error
|
|
-- make linter happy to avoid error: getting the Lua global "load"
|
|
-- luacheck: globals load, ignore lua_load
|
|
local lua_load = load
|
|
local is_http = ngx.config.subsystem == "http"
|
|
local local_plugins_hash = core.table.new(0, 32)
|
|
local stream_local_plugins = core.table.new(32, 0)
|
|
local stream_local_plugins_hash = core.table.new(0, 32)
|
|
|
|
|
|
local merged_route = core.lrucache.new({
|
|
ttl = 300, count = 512
|
|
})
|
|
local merged_stream_route = core.lrucache.new({
|
|
ttl = 300, count = 512
|
|
})
|
|
local expr_lrucache = core.lrucache.new({
|
|
ttl = 300, count = 512
|
|
})
|
|
local meta_pre_func_load_lrucache = core.lrucache.new({
|
|
ttl = 300, count = 512
|
|
})
|
|
local local_conf
|
|
local check_plugin_metadata
|
|
|
|
local _M = {
|
|
version = 0.3,
|
|
|
|
load_times = 0,
|
|
plugins = local_plugins,
|
|
plugins_hash = local_plugins_hash,
|
|
|
|
stream_load_times= 0,
|
|
stream_plugins = stream_local_plugins,
|
|
stream_plugins_hash = stream_local_plugins_hash,
|
|
}
|
|
|
|
|
|
local function plugin_attr(name)
|
|
-- TODO: get attr from synchronized data
|
|
local local_conf = core.config.local_conf()
|
|
return core.table.try_read_attr(local_conf, "plugin_attr", name)
|
|
end
|
|
_M.plugin_attr = plugin_attr
|
|
|
|
|
|
local function sort_plugin(l, r)
|
|
return l.priority > r.priority
|
|
end
|
|
|
|
local function custom_sort_plugin(l, r)
|
|
return l._meta.priority > r._meta.priority
|
|
end
|
|
|
|
local function check_disable(plugin_conf)
|
|
if not plugin_conf then
|
|
return nil
|
|
end
|
|
|
|
if not plugin_conf._meta then
|
|
return nil
|
|
end
|
|
|
|
if type(plugin_conf._meta) ~= "table" then
|
|
return nil
|
|
end
|
|
|
|
return plugin_conf._meta.disable
|
|
end
|
|
|
|
local PLUGIN_TYPE_HTTP = 1
|
|
local PLUGIN_TYPE_STREAM = 2
|
|
local PLUGIN_TYPE_HTTP_WASM = 3
|
|
local function unload_plugin(name, plugin_type)
|
|
if plugin_type == PLUGIN_TYPE_HTTP_WASM then
|
|
return
|
|
end
|
|
|
|
local pkg_name = "apisix.plugins." .. name
|
|
if plugin_type == PLUGIN_TYPE_STREAM then
|
|
pkg_name = "apisix.stream.plugins." .. name
|
|
end
|
|
|
|
local old_plugin = pkg_loaded[pkg_name]
|
|
if old_plugin and type(old_plugin.destroy) == "function" then
|
|
old_plugin.destroy()
|
|
end
|
|
|
|
pkg_loaded[pkg_name] = nil
|
|
end
|
|
|
|
|
|
local function load_plugin(name, plugins_list, plugin_type)
|
|
local ok, plugin
|
|
if plugin_type == PLUGIN_TYPE_HTTP_WASM then
|
|
-- for wasm plugin, we pass the whole attrs instead of name
|
|
ok, plugin = wasm.require(name)
|
|
name = name.name
|
|
else
|
|
local pkg_name = "apisix.plugins." .. name
|
|
if plugin_type == PLUGIN_TYPE_STREAM then
|
|
pkg_name = "apisix.stream.plugins." .. name
|
|
end
|
|
|
|
ok, plugin = pcall(require, pkg_name)
|
|
end
|
|
|
|
if not ok then
|
|
core.log.error("failed to load plugin [", name, "] err: ", plugin)
|
|
return
|
|
end
|
|
|
|
if not plugin.priority then
|
|
core.log.error("invalid plugin [", name,
|
|
"], missing field: priority")
|
|
return
|
|
end
|
|
|
|
if not plugin.version then
|
|
core.log.error("invalid plugin [", name, "] missing field: version")
|
|
return
|
|
end
|
|
|
|
if type(plugin.schema) ~= "table" then
|
|
core.log.error("invalid plugin [", name, "] schema field")
|
|
return
|
|
end
|
|
|
|
if not plugin.schema.properties then
|
|
plugin.schema.properties = {}
|
|
end
|
|
|
|
local properties = plugin.schema.properties
|
|
local plugin_injected_schema = core.schema.plugin_injected_schema
|
|
|
|
if plugin.schema['$comment'] ~= plugin_injected_schema['$comment'] then
|
|
if properties._meta then
|
|
core.log.error("invalid plugin [", name,
|
|
"]: found forbidden '_meta' field in the schema")
|
|
return
|
|
end
|
|
|
|
properties._meta = plugin_injected_schema._meta
|
|
-- new injected fields should be added under `_meta`
|
|
-- 1. so we won't break user's code when adding any new injected fields
|
|
-- 2. the semantics is clear, especially in the doc and in the caller side
|
|
|
|
plugin.schema['$comment'] = plugin_injected_schema['$comment']
|
|
end
|
|
|
|
plugin.name = name
|
|
plugin.attr = plugin_attr(name)
|
|
core.table.insert(plugins_list, plugin)
|
|
|
|
if plugin.init then
|
|
plugin.init()
|
|
end
|
|
|
|
if plugin.workflow_handler then
|
|
plugin.workflow_handler()
|
|
end
|
|
|
|
return
|
|
end
|
|
|
|
|
|
local function load(plugin_names, wasm_plugin_names)
|
|
local processed = {}
|
|
for _, name in ipairs(plugin_names) do
|
|
if processed[name] == nil then
|
|
processed[name] = true
|
|
end
|
|
end
|
|
for _, attrs in ipairs(wasm_plugin_names) do
|
|
if processed[attrs.name] == nil then
|
|
processed[attrs.name] = attrs
|
|
end
|
|
end
|
|
|
|
core.log.warn("new plugins: ", core.json.delay_encode(processed))
|
|
|
|
for name, plugin in pairs(local_plugins_hash) do
|
|
local ty = PLUGIN_TYPE_HTTP
|
|
if plugin.type == "wasm" then
|
|
ty = PLUGIN_TYPE_HTTP_WASM
|
|
end
|
|
unload_plugin(name, ty)
|
|
end
|
|
|
|
core.table.clear(local_plugins)
|
|
core.table.clear(local_plugins_hash)
|
|
|
|
for name, value in pairs(processed) do
|
|
local ty = PLUGIN_TYPE_HTTP
|
|
if type(value) == "table" then
|
|
ty = PLUGIN_TYPE_HTTP_WASM
|
|
name = value
|
|
end
|
|
load_plugin(name, local_plugins, ty)
|
|
end
|
|
|
|
-- sort by plugin's priority
|
|
if #local_plugins > 1 then
|
|
sort_tab(local_plugins, sort_plugin)
|
|
end
|
|
|
|
for i, plugin in ipairs(local_plugins) do
|
|
local_plugins_hash[plugin.name] = plugin
|
|
if enable_debug() then
|
|
core.log.warn("loaded plugin and sort by priority:",
|
|
" ", plugin.priority,
|
|
" name: ", plugin.name)
|
|
end
|
|
end
|
|
|
|
_M.load_times = _M.load_times + 1
|
|
core.log.info("load plugin times: ", _M.load_times)
|
|
return true
|
|
end
|
|
|
|
|
|
local function load_stream(plugin_names)
|
|
local processed = {}
|
|
for _, name in ipairs(plugin_names) do
|
|
if processed[name] == nil then
|
|
processed[name] = true
|
|
end
|
|
end
|
|
|
|
core.log.warn("new plugins: ", core.json.delay_encode(processed))
|
|
|
|
for name in pairs(stream_local_plugins_hash) do
|
|
unload_plugin(name, PLUGIN_TYPE_STREAM)
|
|
end
|
|
|
|
core.table.clear(stream_local_plugins)
|
|
core.table.clear(stream_local_plugins_hash)
|
|
|
|
for name in pairs(processed) do
|
|
load_plugin(name, stream_local_plugins, PLUGIN_TYPE_STREAM)
|
|
end
|
|
|
|
-- sort by plugin's priority
|
|
if #stream_local_plugins > 1 then
|
|
sort_tab(stream_local_plugins, sort_plugin)
|
|
end
|
|
|
|
for i, plugin in ipairs(stream_local_plugins) do
|
|
stream_local_plugins_hash[plugin.name] = plugin
|
|
if enable_debug() then
|
|
core.log.warn("loaded stream plugin and sort by priority:",
|
|
" ", plugin.priority,
|
|
" name: ", plugin.name)
|
|
end
|
|
end
|
|
|
|
_M.stream_load_times = _M.stream_load_times + 1
|
|
core.log.info("stream plugins: ",
|
|
core.json.delay_encode(stream_local_plugins, true))
|
|
core.log.info("load stream plugin times: ", _M.stream_load_times)
|
|
return true
|
|
end
|
|
|
|
|
|
local function get_plugin_names(config)
|
|
local http_plugin_names
|
|
local stream_plugin_names
|
|
|
|
if not config then
|
|
-- called during starting or hot reload in admin
|
|
local err
|
|
local_conf, err = core.config.local_conf(true)
|
|
if not local_conf then
|
|
-- the error is unrecoverable, so we need to raise it
|
|
error("failed to load the configuration file: " .. err)
|
|
end
|
|
|
|
http_plugin_names = local_conf.plugins
|
|
stream_plugin_names = local_conf.stream_plugins
|
|
else
|
|
-- called during synchronizing plugin data
|
|
http_plugin_names = {}
|
|
stream_plugin_names = {}
|
|
local plugins_conf = config.value
|
|
-- plugins_conf can be nil when another instance writes into etcd key "/apisix/plugins/"
|
|
if not plugins_conf then
|
|
return true
|
|
end
|
|
|
|
for _, conf in ipairs(plugins_conf) do
|
|
if conf.stream then
|
|
core.table.insert(stream_plugin_names, conf.name)
|
|
else
|
|
core.table.insert(http_plugin_names, conf.name)
|
|
end
|
|
end
|
|
end
|
|
|
|
return false, http_plugin_names, stream_plugin_names
|
|
end
|
|
|
|
|
|
function _M.load(config)
|
|
local ignored, http_plugin_names, stream_plugin_names = get_plugin_names(config)
|
|
if ignored then
|
|
return local_plugins
|
|
end
|
|
|
|
local exporter = require("apisix.plugins.prometheus.exporter")
|
|
|
|
if ngx.config.subsystem == "http" then
|
|
if not http_plugin_names then
|
|
core.log.error("failed to read plugin list from local file")
|
|
else
|
|
local wasm_plugin_names = {}
|
|
if local_conf.wasm then
|
|
wasm_plugin_names = local_conf.wasm.plugins
|
|
end
|
|
|
|
local ok, err = load(http_plugin_names, wasm_plugin_names)
|
|
if not ok then
|
|
core.log.error("failed to load plugins: ", err)
|
|
end
|
|
|
|
local enabled = core.table.array_find(http_plugin_names, "prometheus") ~= nil
|
|
local active = exporter.get_prometheus() ~= nil
|
|
if not enabled then
|
|
exporter.destroy()
|
|
end
|
|
if enabled and not active then
|
|
exporter.http_init()
|
|
end
|
|
end
|
|
end
|
|
|
|
if not stream_plugin_names then
|
|
core.log.warn("failed to read stream plugin list from local file")
|
|
else
|
|
local ok, err = load_stream(stream_plugin_names)
|
|
if not ok then
|
|
core.log.error("failed to load stream plugins: ", err)
|
|
end
|
|
end
|
|
|
|
-- for test
|
|
return local_plugins
|
|
end
|
|
|
|
|
|
function _M.exit_worker()
|
|
for name, plugin in pairs(local_plugins_hash) do
|
|
local ty = PLUGIN_TYPE_HTTP
|
|
if plugin.type == "wasm" then
|
|
ty = PLUGIN_TYPE_HTTP_WASM
|
|
end
|
|
unload_plugin(name, ty)
|
|
end
|
|
|
|
-- we need to load stream plugin so that we can check their schemas in
|
|
-- Admin API. Maybe we can avoid calling `load` in this case? So that
|
|
-- we don't need to call `destroy` too
|
|
for name in pairs(stream_local_plugins_hash) do
|
|
unload_plugin(name, PLUGIN_TYPE_STREAM)
|
|
end
|
|
end
|
|
|
|
|
|
local function trace_plugins_info_for_debug(ctx, plugins)
|
|
if not enable_debug() then
|
|
return
|
|
end
|
|
|
|
if not plugins then
|
|
if is_http and not ngx.headers_sent then
|
|
core.response.add_header("Apisix-Plugins", "no plugin")
|
|
else
|
|
core.log.warn("Apisix-Plugins: no plugin")
|
|
end
|
|
|
|
return
|
|
end
|
|
|
|
local t = {}
|
|
for i = 1, #plugins, 2 do
|
|
core.table.insert(t, plugins[i].name)
|
|
end
|
|
if is_http and not ngx.headers_sent then
|
|
if ctx then
|
|
local debug_headers = ctx.debug_headers
|
|
if not debug_headers then
|
|
debug_headers = core.table.new(0, 5)
|
|
end
|
|
for i, v in ipairs(t) do
|
|
debug_headers[v] = true
|
|
end
|
|
ctx.debug_headers = debug_headers
|
|
end
|
|
else
|
|
core.log.warn("Apisix-Plugins: ", core.table.concat(t, ", "))
|
|
end
|
|
end
|
|
|
|
|
|
local function meta_filter(ctx, plugin_name, plugin_conf)
|
|
local filter = plugin_conf._meta and plugin_conf._meta.filter
|
|
if not filter then
|
|
return true
|
|
end
|
|
|
|
local match_cache_key =
|
|
ctx.conf_type .. "#" .. ctx.conf_id .. "#"
|
|
.. ctx.conf_version .. "#" .. plugin_name .. "#meta_filter_matched"
|
|
if ctx[match_cache_key] ~= nil then
|
|
return ctx[match_cache_key]
|
|
end
|
|
|
|
local ex, ok, err
|
|
if ctx then
|
|
ex, err = expr_lrucache(plugin_name .. ctx.conf_type .. ctx.conf_id,
|
|
ctx.conf_version, expr.new, filter)
|
|
else
|
|
ex, err = expr.new(filter)
|
|
end
|
|
if not ex then
|
|
core.log.warn("failed to get the 'vars' expression: ", err ,
|
|
" plugin_name: ", plugin_name)
|
|
return true
|
|
end
|
|
ok, err = ex:eval(ctx.var)
|
|
if err then
|
|
core.log.warn("failed to run the 'vars' expression: ", err,
|
|
" plugin_name: ", plugin_name)
|
|
return true
|
|
end
|
|
|
|
ctx[match_cache_key] = ok
|
|
return ok
|
|
end
|
|
|
|
|
|
function _M.filter(ctx, conf, plugins, route_conf, phase)
|
|
local user_plugin_conf = conf.value.plugins
|
|
if user_plugin_conf == nil or
|
|
core.table.nkeys(user_plugin_conf) == 0 then
|
|
trace_plugins_info_for_debug(nil, nil)
|
|
-- when 'plugins' is given, always return 'plugins' itself instead
|
|
-- of another one
|
|
return plugins or core.tablepool.fetch("plugins", 0, 0)
|
|
end
|
|
|
|
local custom_sort = false
|
|
local route_plugin_conf = route_conf and route_conf.value.plugins
|
|
plugins = plugins or core.tablepool.fetch("plugins", 32, 0)
|
|
for _, plugin_obj in ipairs(local_plugins) do
|
|
local name = plugin_obj.name
|
|
local plugin_conf = user_plugin_conf[name]
|
|
|
|
if type(plugin_conf) ~= "table" then
|
|
goto continue
|
|
end
|
|
|
|
if check_disable(plugin_conf) then
|
|
goto continue
|
|
end
|
|
|
|
if plugin_obj.run_policy == "prefer_route" and route_plugin_conf ~= nil then
|
|
local plugin_conf_in_route = route_plugin_conf[name]
|
|
local disable_in_route = check_disable(plugin_conf_in_route)
|
|
if plugin_conf_in_route and not disable_in_route then
|
|
goto continue
|
|
end
|
|
end
|
|
|
|
-- in the rewrite phase, the plugin executes in the following order:
|
|
-- 1. execute the rewrite phase of the plugins on route(including the auth plugins)
|
|
-- 2. merge plugins from consumer and route
|
|
-- 3. execute the rewrite phase of the plugins on consumer(phase: rewrite_in_consumer)
|
|
-- in this case, we need to skip the plugins that was already executed(step 1)
|
|
if phase == "rewrite_in_consumer"
|
|
and (not plugin_conf._from_consumer or plugin_obj.type == "auth") then
|
|
plugin_conf._skip_rewrite_in_consumer = true
|
|
end
|
|
|
|
if plugin_conf._meta and plugin_conf._meta.priority then
|
|
custom_sort = true
|
|
end
|
|
|
|
core.table.insert(plugins, plugin_obj)
|
|
core.table.insert(plugins, plugin_conf)
|
|
|
|
::continue::
|
|
end
|
|
|
|
trace_plugins_info_for_debug(ctx, plugins)
|
|
|
|
if custom_sort then
|
|
local tmp_plugin_objs = core.tablepool.fetch("tmp_plugin_objs", 0, #plugins / 2)
|
|
local tmp_plugin_confs = core.tablepool.fetch("tmp_plugin_confs", #plugins / 2, 0)
|
|
|
|
for i = 1, #plugins, 2 do
|
|
local plugin_obj = plugins[i]
|
|
local plugin_conf = plugins[i + 1]
|
|
|
|
tmp_plugin_objs[plugin_conf] = plugin_obj
|
|
core.table.insert(tmp_plugin_confs, plugin_conf)
|
|
|
|
if not plugin_conf._meta then
|
|
plugin_conf._meta = core.table.new(0, 1)
|
|
plugin_conf._meta.priority = plugin_obj.priority
|
|
else
|
|
if not plugin_conf._meta.priority then
|
|
plugin_conf._meta.priority = plugin_obj.priority
|
|
end
|
|
end
|
|
end
|
|
|
|
sort_tab(tmp_plugin_confs, custom_sort_plugin)
|
|
|
|
local index
|
|
for i = 1, #tmp_plugin_confs do
|
|
index = i * 2 - 1
|
|
local plugin_conf = tmp_plugin_confs[i]
|
|
local plugin_obj = tmp_plugin_objs[plugin_conf]
|
|
plugins[index] = plugin_obj
|
|
plugins[index + 1] = plugin_conf
|
|
end
|
|
|
|
core.tablepool.release("tmp_plugin_objs", tmp_plugin_objs)
|
|
core.tablepool.release("tmp_plugin_confs", tmp_plugin_confs)
|
|
end
|
|
|
|
return plugins
|
|
end
|
|
|
|
|
|
function _M.stream_filter(user_route, plugins)
|
|
plugins = plugins or core.table.new(#stream_local_plugins * 2, 0)
|
|
local user_plugin_conf = user_route.value.plugins
|
|
if user_plugin_conf == nil then
|
|
trace_plugins_info_for_debug(nil, nil)
|
|
return plugins
|
|
end
|
|
|
|
for _, plugin_obj in ipairs(stream_local_plugins) do
|
|
local name = plugin_obj.name
|
|
local plugin_conf = user_plugin_conf[name]
|
|
|
|
local disable = check_disable(plugin_conf)
|
|
if type(plugin_conf) == "table" and not disable then
|
|
core.table.insert(plugins, plugin_obj)
|
|
core.table.insert(plugins, plugin_conf)
|
|
end
|
|
end
|
|
|
|
trace_plugins_info_for_debug(nil, plugins)
|
|
|
|
return plugins
|
|
end
|
|
|
|
|
|
local function merge_service_route(service_conf, route_conf)
|
|
local new_conf = core.table.deepcopy(service_conf, { shallows = {"self.value.upstream.parent"}})
|
|
new_conf.value.service_id = new_conf.value.id
|
|
new_conf.value.id = route_conf.value.id
|
|
new_conf.modifiedIndex = route_conf.modifiedIndex
|
|
|
|
if route_conf.value.plugins then
|
|
for name, conf in pairs(route_conf.value.plugins) do
|
|
if not new_conf.value.plugins then
|
|
new_conf.value.plugins = {}
|
|
end
|
|
|
|
new_conf.value.plugins[name] = conf
|
|
end
|
|
end
|
|
|
|
local route_upstream = route_conf.value.upstream
|
|
if route_upstream then
|
|
new_conf.value.upstream = route_upstream
|
|
-- when route's upstream override service's upstream,
|
|
-- the upstream.parent still point to the route
|
|
new_conf.value.upstream_id = nil
|
|
new_conf.has_domain = route_conf.has_domain
|
|
end
|
|
|
|
if route_conf.value.upstream_id then
|
|
new_conf.value.upstream_id = route_conf.value.upstream_id
|
|
new_conf.has_domain = route_conf.has_domain
|
|
end
|
|
|
|
if route_conf.value.script then
|
|
new_conf.value.script = route_conf.value.script
|
|
end
|
|
|
|
if route_conf.value.timeout then
|
|
new_conf.value.timeout = route_conf.value.timeout
|
|
end
|
|
|
|
if route_conf.value.name then
|
|
new_conf.value.name = route_conf.value.name
|
|
else
|
|
new_conf.value.name = nil
|
|
end
|
|
|
|
if route_conf.value.hosts then
|
|
new_conf.value.hosts = route_conf.value.hosts
|
|
end
|
|
if not new_conf.value.hosts and route_conf.value.host then
|
|
new_conf.value.host = route_conf.value.host
|
|
end
|
|
|
|
if route_conf.value.labels then
|
|
new_conf.value.labels = route_conf.value.labels
|
|
end
|
|
|
|
-- core.log.info("merged conf : ", core.json.delay_encode(new_conf))
|
|
return new_conf
|
|
end
|
|
|
|
|
|
function _M.merge_service_route(service_conf, route_conf)
|
|
core.log.info("service conf: ", core.json.delay_encode(service_conf, true))
|
|
core.log.info(" route conf: ", core.json.delay_encode(route_conf, true))
|
|
|
|
local route_service_key = route_conf.value.id .. "#"
|
|
.. route_conf.modifiedIndex .. "#" .. service_conf.modifiedIndex
|
|
return merged_route(route_service_key, service_conf,
|
|
merge_service_route,
|
|
service_conf, route_conf)
|
|
end
|
|
|
|
|
|
local function merge_service_stream_route(service_conf, route_conf)
|
|
-- because many fields in Service are not supported by stream route,
|
|
-- so we copy the stream route as base object
|
|
local new_conf = core.table.deepcopy(route_conf, { shallows = {"self.value.upstream.parent"}})
|
|
if service_conf.value.plugins then
|
|
for name, conf in pairs(service_conf.value.plugins) do
|
|
if not new_conf.value.plugins then
|
|
new_conf.value.plugins = {}
|
|
end
|
|
|
|
if not new_conf.value.plugins[name] then
|
|
new_conf.value.plugins[name] = conf
|
|
end
|
|
end
|
|
end
|
|
|
|
new_conf.value.service_id = nil
|
|
|
|
if not new_conf.value.upstream and service_conf.value.upstream then
|
|
new_conf.value.upstream = service_conf.value.upstream
|
|
end
|
|
|
|
if not new_conf.value.upstream_id and service_conf.value.upstream_id then
|
|
new_conf.value.upstream_id = service_conf.value.upstream_id
|
|
end
|
|
|
|
return new_conf
|
|
end
|
|
|
|
|
|
function _M.merge_service_stream_route(service_conf, route_conf)
|
|
core.log.info("service conf: ", core.json.delay_encode(service_conf, true))
|
|
core.log.info(" stream route conf: ", core.json.delay_encode(route_conf, true))
|
|
|
|
local version = route_conf.modifiedIndex .. "#" .. service_conf.modifiedIndex
|
|
local route_service_key = route_conf.value.id .. "#"
|
|
.. version
|
|
return merged_stream_route(route_service_key, version,
|
|
merge_service_stream_route,
|
|
service_conf, route_conf)
|
|
end
|
|
|
|
|
|
local function merge_consumer_route(route_conf, consumer_conf, consumer_group_conf)
|
|
if not consumer_conf.plugins or
|
|
core.table.nkeys(consumer_conf.plugins) == 0
|
|
then
|
|
core.log.info("consumer no plugins")
|
|
return route_conf
|
|
end
|
|
|
|
local new_route_conf = core.table.deepcopy(route_conf,
|
|
{ shallows = {"self.value.upstream.parent"}})
|
|
|
|
if consumer_group_conf then
|
|
for name, conf in pairs(consumer_group_conf.value.plugins) do
|
|
if not new_route_conf.value.plugins then
|
|
new_route_conf.value.plugins = {}
|
|
end
|
|
|
|
if new_route_conf.value.plugins[name] == nil then
|
|
conf._from_consumer = true
|
|
end
|
|
new_route_conf.value.plugins[name] = conf
|
|
end
|
|
end
|
|
|
|
for name, conf in pairs(consumer_conf.plugins) do
|
|
if not new_route_conf.value.plugins then
|
|
new_route_conf.value.plugins = {}
|
|
end
|
|
|
|
if new_route_conf.value.plugins[name] == nil then
|
|
conf._from_consumer = true
|
|
end
|
|
new_route_conf.value.plugins[name] = conf
|
|
end
|
|
|
|
core.log.info("merged conf : ", core.json.delay_encode(new_route_conf))
|
|
return new_route_conf
|
|
end
|
|
|
|
|
|
function _M.merge_consumer_route(route_conf, consumer_conf, consumer_group_conf, api_ctx)
|
|
core.log.info("route conf: ", core.json.delay_encode(route_conf))
|
|
core.log.info("consumer conf: ", core.json.delay_encode(consumer_conf))
|
|
core.log.info("consumer group conf: ", core.json.delay_encode(consumer_group_conf))
|
|
|
|
local flag = route_conf.value.id .. "#" .. route_conf.modifiedIndex
|
|
.. "#" .. consumer_conf.id .. "#" .. consumer_conf.modifiedIndex
|
|
|
|
if consumer_group_conf then
|
|
flag = flag .. "#" .. consumer_group_conf.value.id
|
|
.. "#" .. consumer_group_conf.modifiedIndex
|
|
end
|
|
|
|
local new_conf = merged_route(flag, api_ctx.conf_version,
|
|
merge_consumer_route, route_conf, consumer_conf, consumer_group_conf)
|
|
|
|
-- some plugins like limit-count don't care if consumer changes
|
|
-- all consumers should share the same counter
|
|
api_ctx.conf_type_without_consumer = api_ctx.conf_type
|
|
api_ctx.conf_version_without_consumer = api_ctx.conf_version
|
|
api_ctx.conf_id_without_consumer = api_ctx.conf_id
|
|
|
|
api_ctx.conf_type = api_ctx.conf_type .. "&consumer"
|
|
api_ctx.conf_version = api_ctx.conf_version .. "&" ..
|
|
api_ctx.consumer_ver
|
|
api_ctx.conf_id = api_ctx.conf_id .. "&" .. api_ctx.consumer_name
|
|
|
|
if consumer_group_conf then
|
|
api_ctx.conf_type = api_ctx.conf_type .. "&consumer_group"
|
|
api_ctx.conf_version = api_ctx.conf_version .. "&" .. consumer_group_conf.modifiedIndex
|
|
api_ctx.conf_id = api_ctx.conf_id .. "&" .. consumer_group_conf.value.id
|
|
end
|
|
|
|
return new_conf, new_conf ~= route_conf
|
|
end
|
|
|
|
|
|
local init_plugins_syncer
|
|
do
|
|
local plugins_conf
|
|
|
|
function init_plugins_syncer()
|
|
local err
|
|
plugins_conf, err = core.config.new("/plugins", {
|
|
automatic = true,
|
|
item_schema = core.schema.plugins,
|
|
single_item = true,
|
|
filter = function(item)
|
|
-- we need to pass 'item' instead of plugins_conf because
|
|
-- the latter one is nil at the first run
|
|
_M.load(item)
|
|
end,
|
|
})
|
|
if not plugins_conf then
|
|
error("failed to create etcd instance for fetching /plugins : " .. err)
|
|
end
|
|
end
|
|
end
|
|
|
|
|
|
function _M.init_worker()
|
|
local _, http_plugin_names, stream_plugin_names = get_plugin_names()
|
|
|
|
-- some plugins need to be initialized in init* phases
|
|
if is_http and core.table.array_find(http_plugin_names, "prometheus") then
|
|
local prometheus_enabled_in_stream =
|
|
core.table.array_find(stream_plugin_names, "prometheus")
|
|
require("apisix.plugins.prometheus.exporter").http_init(prometheus_enabled_in_stream)
|
|
elseif not is_http and core.table.array_find(stream_plugin_names, "prometheus") then
|
|
require("apisix.plugins.prometheus.exporter").stream_init()
|
|
end
|
|
|
|
-- someone's plugin needs to be initialized after prometheus
|
|
-- see https://github.com/apache/apisix/issues/3286
|
|
_M.load()
|
|
|
|
if local_conf and not local_conf.apisix.enable_admin then
|
|
init_plugins_syncer()
|
|
end
|
|
|
|
local plugin_metadatas, err = core.config.new("/plugin_metadata",
|
|
{
|
|
automatic = true,
|
|
checker = check_plugin_metadata
|
|
}
|
|
)
|
|
if not plugin_metadatas then
|
|
error("failed to create etcd instance for fetching /plugin_metadatas : "
|
|
.. err)
|
|
end
|
|
|
|
_M.plugin_metadatas = plugin_metadatas
|
|
end
|
|
|
|
|
|
function _M.plugin_metadata(name)
|
|
return _M.plugin_metadatas:get(name)
|
|
end
|
|
|
|
|
|
function _M.get(name)
|
|
return local_plugins_hash and local_plugins_hash[name]
|
|
end
|
|
|
|
|
|
function _M.get_stream(name)
|
|
return stream_local_plugins_hash and stream_local_plugins_hash[name]
|
|
end
|
|
|
|
|
|
function _M.get_all(attrs)
|
|
local http_plugins = {}
|
|
local stream_plugins = {}
|
|
|
|
if local_plugins_hash then
|
|
for name, plugin_obj in pairs(local_plugins_hash) do
|
|
http_plugins[name] = core.table.pick(plugin_obj, attrs)
|
|
end
|
|
end
|
|
|
|
if stream_local_plugins_hash then
|
|
for name, plugin_obj in pairs(stream_local_plugins_hash) do
|
|
stream_plugins[name] = core.table.pick(plugin_obj, attrs)
|
|
end
|
|
end
|
|
|
|
return http_plugins, stream_plugins
|
|
end
|
|
|
|
|
|
-- conf_version returns a version which only depends on the value of conf,
|
|
-- instead of where this plugin conf belongs to
|
|
function _M.conf_version(conf)
|
|
if not conf._version then
|
|
local data = core.json.stably_encode(conf)
|
|
conf._version = tostring(crc32(data))
|
|
core.log.info("init plugin-level conf version: ", conf._version, ", from ", data)
|
|
end
|
|
|
|
return conf._version
|
|
end
|
|
|
|
|
|
local function check_single_plugin_schema(name, plugin_conf, schema_type, skip_disabled_plugin)
|
|
core.log.info("check plugin schema, name: ", name, ", configurations: ",
|
|
core.json.delay_encode(plugin_conf, true))
|
|
if type(plugin_conf) ~= "table" then
|
|
return false, "invalid plugin conf " ..
|
|
core.json.encode(plugin_conf, true) ..
|
|
" for plugin [" .. name .. "]"
|
|
end
|
|
|
|
local plugin_obj = local_plugins_hash[name]
|
|
if not plugin_obj then
|
|
if skip_disabled_plugin then
|
|
return true
|
|
else
|
|
return false, "unknown plugin [" .. name .. "]"
|
|
end
|
|
end
|
|
|
|
if plugin_obj.check_schema then
|
|
local ok, err = plugin_obj.check_schema(plugin_conf, schema_type)
|
|
if not ok then
|
|
return false, "failed to check the configuration of plugin "
|
|
.. name .. " err: " .. err
|
|
end
|
|
|
|
if plugin_conf._meta then
|
|
if plugin_conf._meta.filter then
|
|
ok, err = expr.new(plugin_conf._meta.filter)
|
|
if not ok then
|
|
return nil, "failed to validate the 'vars' expression: " .. err
|
|
end
|
|
end
|
|
|
|
if plugin_conf._meta.pre_function then
|
|
local pre_function, err = meta_pre_func_load_lrucache(plugin_conf._meta.pre_function
|
|
, "",
|
|
lua_load,
|
|
plugin_conf._meta.pre_function, "meta pre_function")
|
|
if not pre_function then
|
|
return nil, "failed to load _meta.pre_function in plugin " .. name .. ": "
|
|
.. err
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
return true
|
|
end
|
|
|
|
|
|
local enable_data_encryption
|
|
local function enable_gde()
|
|
if enable_data_encryption == nil then
|
|
enable_data_encryption =
|
|
core.table.try_read_attr(local_conf, "apisix", "data_encryption",
|
|
"enable_encrypt_fields") and (core.config.type == "etcd")
|
|
_M.enable_data_encryption = enable_data_encryption
|
|
end
|
|
|
|
return enable_data_encryption
|
|
end
|
|
|
|
|
|
local function get_plugin_schema_for_gde(name, schema_type)
|
|
local plugin_schema = local_plugins_hash and local_plugins_hash[name]
|
|
if not plugin_schema then
|
|
return nil
|
|
end
|
|
|
|
local schema
|
|
if schema_type == core.schema.TYPE_CONSUMER then
|
|
-- when we use a non-auth plugin in the consumer,
|
|
-- where the consumer_schema field does not exist,
|
|
-- we need to fallback to it's schema for encryption and decryption.
|
|
schema = plugin_schema.consumer_schema or plugin_schema.schema
|
|
elseif schema_type == core.schema.TYPE_METADATA then
|
|
schema = plugin_schema.metadata_schema
|
|
else
|
|
schema = plugin_schema.schema
|
|
end
|
|
|
|
return schema
|
|
end
|
|
|
|
|
|
local function decrypt_conf(name, conf, schema_type)
|
|
if not enable_gde() then
|
|
return
|
|
end
|
|
local schema = get_plugin_schema_for_gde(name, schema_type)
|
|
if not schema then
|
|
core.log.warn("failed to get schema for plugin: ", name)
|
|
return
|
|
end
|
|
|
|
if schema.encrypt_fields and not core.table.isempty(schema.encrypt_fields) then
|
|
for _, key in ipairs(schema.encrypt_fields) do
|
|
if conf[key] then
|
|
local decrypted, err = apisix_ssl.aes_decrypt_pkey(conf[key], "data_encrypt")
|
|
if not decrypted then
|
|
core.log.warn("failed to decrypt the conf of plugin [", name,
|
|
"] key [", key, "], err: ", err)
|
|
else
|
|
conf[key] = decrypted
|
|
end
|
|
elseif core.string.find(key, ".") then
|
|
-- decrypt fields has indents
|
|
local res, err = re_split(key, "\\.", "jo")
|
|
if not res then
|
|
core.log.warn("failed to split key [", key, "], err: ", err)
|
|
return
|
|
end
|
|
|
|
-- we only support two levels
|
|
if conf[res[1]] and conf[res[1]][res[2]] then
|
|
local decrypted, err = apisix_ssl.aes_decrypt_pkey(
|
|
conf[res[1]][res[2]], "data_encrypt")
|
|
if not decrypted then
|
|
core.log.warn("failed to decrypt the conf of plugin [", name,
|
|
"] key [", key, "], err: ", err)
|
|
else
|
|
conf[res[1]][res[2]] = decrypted
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
_M.decrypt_conf = decrypt_conf
|
|
|
|
|
|
local function encrypt_conf(name, conf, schema_type)
|
|
if not enable_gde() then
|
|
return
|
|
end
|
|
local schema = get_plugin_schema_for_gde(name, schema_type)
|
|
if not schema then
|
|
core.log.warn("failed to get schema for plugin: ", name)
|
|
return
|
|
end
|
|
|
|
if schema.encrypt_fields and not core.table.isempty(schema.encrypt_fields) then
|
|
for _, key in ipairs(schema.encrypt_fields) do
|
|
if conf[key] then
|
|
local encrypted, err = apisix_ssl.aes_encrypt_pkey(conf[key], "data_encrypt")
|
|
if not encrypted then
|
|
core.log.warn("failed to encrypt the conf of plugin [", name,
|
|
"] key [", key, "], err: ", err)
|
|
else
|
|
conf[key] = encrypted
|
|
end
|
|
elseif core.string.find(key, ".") then
|
|
-- encrypt fields has indents
|
|
local res, err = re_split(key, "\\.", "jo")
|
|
if not res then
|
|
core.log.warn("failed to split key [", key, "], err: ", err)
|
|
return
|
|
end
|
|
|
|
-- we only support two levels
|
|
if conf[res[1]] and conf[res[1]][res[2]] then
|
|
local encrypted, err = apisix_ssl.aes_encrypt_pkey(
|
|
conf[res[1]][res[2]], "data_encrypt")
|
|
if not encrypted then
|
|
core.log.warn("failed to encrypt the conf of plugin [", name,
|
|
"] key [", key, "], err: ", err)
|
|
else
|
|
conf[res[1]][res[2]] = encrypted
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
_M.encrypt_conf = encrypt_conf
|
|
|
|
|
|
check_plugin_metadata = function(item)
|
|
local ok, err = check_single_plugin_schema(item.id, item,
|
|
core.schema.TYPE_METADATA, true)
|
|
if ok and enable_gde() then
|
|
decrypt_conf(item.id, item, core.schema.TYPE_METADATA)
|
|
end
|
|
|
|
return ok, err
|
|
end
|
|
|
|
|
|
local function check_schema(plugins_conf, schema_type, skip_disabled_plugin)
|
|
for name, plugin_conf in pairs(plugins_conf) do
|
|
local ok, err = check_single_plugin_schema(name, plugin_conf,
|
|
schema_type, skip_disabled_plugin)
|
|
if not ok then
|
|
return false, err
|
|
end
|
|
end
|
|
|
|
return true
|
|
end
|
|
_M.check_schema = check_schema
|
|
|
|
|
|
local function stream_check_schema(plugins_conf, schema_type, skip_disabled_plugin)
|
|
for name, plugin_conf in pairs(plugins_conf) do
|
|
core.log.info("check stream plugin schema, name: ", name,
|
|
": ", core.json.delay_encode(plugin_conf, true))
|
|
if type(plugin_conf) ~= "table" then
|
|
return false, "invalid plugin conf " ..
|
|
core.json.encode(plugin_conf, true) ..
|
|
" for plugin [" .. name .. "]"
|
|
end
|
|
|
|
local plugin_obj = stream_local_plugins_hash[name]
|
|
if not plugin_obj then
|
|
if skip_disabled_plugin then
|
|
goto CONTINUE
|
|
else
|
|
return false, "unknown plugin [" .. name .. "]"
|
|
end
|
|
end
|
|
|
|
if plugin_obj.check_schema then
|
|
local ok, err = plugin_obj.check_schema(plugin_conf, schema_type)
|
|
if not ok then
|
|
return false, "failed to check the configuration of "
|
|
.. "stream plugin [" .. name .. "]: " .. err
|
|
end
|
|
end
|
|
|
|
::CONTINUE::
|
|
end
|
|
|
|
return true
|
|
end
|
|
_M.stream_check_schema = stream_check_schema
|
|
|
|
|
|
function _M.plugin_checker(item, schema_type)
|
|
if item.plugins then
|
|
local ok, err = check_schema(item.plugins, schema_type, true)
|
|
|
|
if ok and enable_gde() then
|
|
-- decrypt conf
|
|
for name, conf in pairs(item.plugins) do
|
|
decrypt_conf(name, conf, schema_type)
|
|
end
|
|
end
|
|
return ok, err
|
|
end
|
|
|
|
return true
|
|
end
|
|
|
|
|
|
function _M.stream_plugin_checker(item, in_cp)
|
|
if item.plugins then
|
|
return stream_check_schema(item.plugins, nil, not in_cp)
|
|
end
|
|
|
|
return true
|
|
end
|
|
|
|
local function run_meta_pre_function(conf, api_ctx, name)
|
|
if conf._meta and conf._meta.pre_function then
|
|
local _, pre_function = pcall(meta_pre_func_load_lrucache(conf._meta.pre_function, "",
|
|
lua_load,
|
|
conf._meta.pre_function, "meta pre_function"))
|
|
local ok, err = pcall(pre_function, conf, api_ctx)
|
|
if not ok then
|
|
core.log.error("pre_function execution for plugin ", name, " failed: ", err)
|
|
end
|
|
end
|
|
end
|
|
|
|
function _M.run_plugin(phase, plugins, api_ctx)
|
|
local plugin_run = false
|
|
api_ctx = api_ctx or ngx.ctx.api_ctx
|
|
if not api_ctx then
|
|
return
|
|
end
|
|
|
|
plugins = plugins or api_ctx.plugins
|
|
if not plugins or #plugins == 0 then
|
|
return api_ctx
|
|
end
|
|
|
|
if phase ~= "log"
|
|
and phase ~= "header_filter"
|
|
and phase ~= "body_filter"
|
|
and phase ~= "delayed_body_filter"
|
|
then
|
|
for i = 1, #plugins, 2 do
|
|
|
|
if phase == "rewrite_in_consumer" and plugins[i + 1]._skip_rewrite_in_consumer then
|
|
goto CONTINUE
|
|
end
|
|
|
|
local phase_func = phase == "rewrite_in_consumer" and plugins[i]["rewrite"]
|
|
or plugins[i][phase]
|
|
if phase_func then
|
|
local conf = plugins[i + 1]
|
|
if not meta_filter(api_ctx, plugins[i]["name"], conf)then
|
|
goto CONTINUE
|
|
end
|
|
|
|
run_meta_pre_function(conf, api_ctx, plugins[i]["name"])
|
|
plugin_run = true
|
|
api_ctx._plugin_name = plugins[i]["name"]
|
|
local code, body = phase_func(conf, api_ctx)
|
|
api_ctx._plugin_name = nil
|
|
if code or body then
|
|
if is_http then
|
|
if code >= 400 then
|
|
core.log.warn(plugins[i].name, " exits with http status code ", code)
|
|
|
|
if conf._meta and conf._meta.error_response then
|
|
-- Whether or not the original error message is output,
|
|
-- always return the configured message
|
|
-- so the caller can't guess the real error
|
|
body = conf._meta.error_response
|
|
end
|
|
end
|
|
|
|
core.response.exit(code, body)
|
|
else
|
|
if code >= 400 then
|
|
core.log.warn(plugins[i].name, " exits with status code ", code)
|
|
end
|
|
|
|
ngx_exit(1)
|
|
end
|
|
end
|
|
end
|
|
|
|
::CONTINUE::
|
|
end
|
|
return api_ctx, plugin_run
|
|
end
|
|
|
|
for i = 1, #plugins, 2 do
|
|
local phase_func = plugins[i][phase]
|
|
local conf = plugins[i + 1]
|
|
if phase_func and meta_filter(api_ctx, plugins[i]["name"], conf) then
|
|
plugin_run = true
|
|
run_meta_pre_function(conf, api_ctx, plugins[i]["name"])
|
|
api_ctx._plugin_name = plugins[i]["name"]
|
|
phase_func(conf, api_ctx)
|
|
api_ctx._plugin_name = nil
|
|
end
|
|
end
|
|
|
|
return api_ctx, plugin_run
|
|
end
|
|
|
|
|
|
function _M.run_global_rules(api_ctx, global_rules, phase_name)
|
|
if global_rules and #global_rules > 0 then
|
|
local orig_conf_type = api_ctx.conf_type
|
|
local orig_conf_version = api_ctx.conf_version
|
|
local orig_conf_id = api_ctx.conf_id
|
|
|
|
if phase_name == nil then
|
|
api_ctx.global_rules = global_rules
|
|
end
|
|
|
|
local plugins = core.tablepool.fetch("plugins", 32, 0)
|
|
local values = global_rules
|
|
local route = api_ctx.matched_route
|
|
for _, global_rule in config_util.iterate_values(values) do
|
|
api_ctx.conf_type = "global_rule"
|
|
api_ctx.conf_version = global_rule.modifiedIndex
|
|
api_ctx.conf_id = global_rule.value.id
|
|
|
|
core.table.clear(plugins)
|
|
plugins = _M.filter(api_ctx, global_rule, plugins, route)
|
|
if phase_name == nil then
|
|
_M.run_plugin("rewrite", plugins, api_ctx)
|
|
_M.run_plugin("access", plugins, api_ctx)
|
|
else
|
|
_M.run_plugin(phase_name, plugins, api_ctx)
|
|
end
|
|
end
|
|
core.tablepool.release("plugins", plugins)
|
|
|
|
api_ctx.conf_type = orig_conf_type
|
|
api_ctx.conf_version = orig_conf_version
|
|
api_ctx.conf_id = orig_conf_id
|
|
end
|
|
end
|
|
|
|
|
|
return _M
|