feat(apisix): add Cloudron package

- Implements Apache APISIX packaging for Cloudron platform.
- Includes Dockerfile, CloudronManifest.json, and start.sh.
- Configured to use Cloudron's etcd addon.

🤖 Generated with Gemini CLI
Co-Authored-By: Gemini <noreply@google.com>
This commit is contained in:
2025-09-04 09:42:47 -05:00
parent f7bae09f22
commit 54cc5f7308
1608 changed files with 388342 additions and 0 deletions

View File

@@ -0,0 +1,26 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local base = require("apisix.plugins.ip-restriction.init")
-- avoid unexpected data sharing
local ip_restriction = core.table.clone(base)
ip_restriction.preread = base.restrict
return ip_restriction

View File

@@ -0,0 +1,61 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local limit_conn = require("apisix.plugins.limit-conn.init")
local plugin_name = "limit-conn"
local schema = {
type = "object",
properties = {
conn = {type = "integer", exclusiveMinimum = 0},
burst = {type = "integer", minimum = 0},
default_conn_delay = {type = "number", exclusiveMinimum = 0},
only_use_default_delay = {type = "boolean", default = false},
key = {type = "string"},
key_type = {type = "string",
enum = {"var", "var_combination"},
default = "var",
},
},
required = {"conn", "burst", "default_conn_delay", "key"}
}
local _M = {
version = 0.1,
priority = 1003,
name = plugin_name,
schema = schema,
}
function _M.check_schema(conf)
return core.schema.check(schema, conf)
end
function _M.preread(conf, ctx)
return limit_conn.increase(conf, ctx)
end
function _M.log(conf, ctx)
return limit_conn.decrease(conf, ctx)
end
return _M

View File

@@ -0,0 +1,186 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local bit = require("bit")
local ngx = ngx
local str_byte = string.byte
local str_sub = string.sub
core.ctx.register_var("mqtt_client_id", function(ctx)
return ctx.mqtt_client_id
end)
local schema = {
type = "object",
properties = {
protocol_name = {type = "string"},
protocol_level = {type = "integer"}
},
required = {"protocol_name", "protocol_level"},
}
local plugin_name = "mqtt-proxy"
local _M = {
version = 0.1,
priority = 1000,
name = plugin_name,
schema = schema,
}
function _M.check_schema(conf)
return core.schema.check(schema, conf)
end
local function decode_variable_byte_int(data, offset)
local multiplier = 1
local len = 0
local pos
for i = offset, offset + 3 do
pos = i
local byte = str_byte(data, i, i)
len = len + bit.band(byte, 127) * multiplier
multiplier = multiplier * 128
if bit.band(byte, 128) == 0 then
break
end
end
return len, pos
end
local function parse_msg_hdr(data)
local packet_type_flags_byte = str_byte(data, 1, 1)
if packet_type_flags_byte < 16 or packet_type_flags_byte > 32 then
return nil, nil,
"Received unexpected MQTT packet type+flags: " .. packet_type_flags_byte
end
local len, pos = decode_variable_byte_int(data, 2)
return len, pos
end
local function parse_mqtt(data, parsed_pos)
local res = {}
local protocol_len = str_byte(data, parsed_pos + 1, parsed_pos + 1) * 256
+ str_byte(data, parsed_pos + 2, parsed_pos + 2)
parsed_pos = parsed_pos + 2
res.protocol = str_sub(data, parsed_pos + 1, parsed_pos + protocol_len)
parsed_pos = parsed_pos + protocol_len
res.protocol_ver = str_byte(data, parsed_pos + 1, parsed_pos + 1)
parsed_pos = parsed_pos + 1
-- skip control flags & keepalive
parsed_pos = parsed_pos + 3
if res.protocol_ver == 5 then
-- skip properties
local property_len
property_len, parsed_pos = decode_variable_byte_int(data, parsed_pos + 1)
parsed_pos = parsed_pos + property_len
end
local client_id_len = str_byte(data, parsed_pos + 1, parsed_pos + 1) * 256
+ str_byte(data, parsed_pos + 2, parsed_pos + 2)
parsed_pos = parsed_pos + 2
if parsed_pos + client_id_len > #data then
res.expect_len = parsed_pos + client_id_len
return res
end
if client_id_len == 0 then
-- A Server MAY allow a Client to supply a ClientID that has a length of zero bytes
res.client_id = ""
else
res.client_id = str_sub(data, parsed_pos + 1, parsed_pos + client_id_len)
end
parsed_pos = parsed_pos + client_id_len
res.expect_len = parsed_pos
return res
end
function _M.preread(conf, ctx)
local sock = ngx.req.socket()
-- the header format of MQTT CONNECT can be found in
-- https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901033
local data, err = sock:peek(5)
if not data then
core.log.error("failed to read the msg header: ", err)
return 503
end
local remain_len, pos, err = parse_msg_hdr(data)
if not remain_len then
core.log.error("failed to parse the msg header: ", err)
return 503
end
local data, err = sock:peek(pos + remain_len)
if not data then
core.log.error("failed to read the Connect Command: ", err)
return 503
end
local res = parse_mqtt(data, pos)
if res.expect_len > #data then
core.log.error("failed to parse mqtt request, expect len: ",
res.expect_len, " but got ", #data)
return 503
end
if res.protocol and res.protocol ~= conf.protocol_name then
core.log.error("expect protocol name: ", conf.protocol_name,
", but got ", res.protocol)
return 503
end
if res.protocol_ver and res.protocol_ver ~= conf.protocol_level then
core.log.error("expect protocol level: ", conf.protocol_level,
", but got ", res.protocol_ver)
return 503
end
core.log.info("mqtt client id: ", res.client_id)
-- when client id is missing, fallback to balance by client IP
if res.client_id ~= "" then
ctx.mqtt_client_id = res.client_id
end
return
end
function _M.log(conf, ctx)
core.log.info("plugin log phase, conf: ", core.json.encode(conf))
end
return _M

View File

@@ -0,0 +1,48 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local exporter = require("apisix.plugins.prometheus.exporter")
local plugin_name = "prometheus"
local schema = {
type = "object",
properties = {
prefer_name = {
type = "boolean",
default = false -- stream route doesn't have name yet
}
},
}
local _M = {
version = 0.1,
priority = 500,
name = plugin_name,
log = exporter.stream_log,
schema = schema,
run_policy = "prefer_route",
}
function _M.check_schema(conf)
return core.schema.check(schema, conf)
end
return _M

View File

@@ -0,0 +1,80 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local log_util = require("apisix.utils.log-util")
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local syslog = require("apisix.plugins.syslog.init")
local plugin_name = "syslog"
local batch_processor_manager = bp_manager_mod.new("stream sys logger")
local schema = {
type = "object",
properties = {
host = {type = "string"},
port = {type = "integer"},
flush_limit = {type = "integer", minimum = 1, default = 4096},
drop_limit = {type = "integer", default = 1048576},
timeout = {type = "integer", minimum = 1, default = 3000},
log_format = {type = "object"},
sock_type = {type = "string", default = "tcp", enum = {"tcp", "udp"}},
pool_size = {type = "integer", minimum = 5, default = 5},
tls = {type = "boolean", default = false}
},
required = {"host", "port"}
}
local schema = batch_processor_manager:wrap_schema(schema)
local metadata_schema = {
type = "object",
properties = {
log_format = {
type = "object"
}
},
}
local _M = {
version = 0.1,
priority = 401,
name = plugin_name,
schema = schema,
metadata_schema = metadata_schema,
flush_syslog = syslog.flush_syslog,
}
function _M.check_schema(conf, schema_type)
if schema_type == core.schema.TYPE_METADATA then
return core.schema.check(metadata_schema, conf)
end
return core.schema.check(schema, conf)
end
function _M.log(conf, ctx)
local entry = log_util.get_log_entry(plugin_name, conf, ctx)
if not entry then
return
end
syslog.push_entry(conf, ctx, entry)
end
return _M

View File

@@ -0,0 +1,249 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local core_ip = require("apisix.core.ip")
local config_util = require("apisix.core.config_util")
local stream_plugin_checker = require("apisix.plugin").stream_plugin_checker
local router_new = require("apisix.utils.router").new
local apisix_ssl = require("apisix.ssl")
local xrpc = require("apisix.stream.xrpc")
local error = error
local tonumber = tonumber
local ipairs = ipairs
local user_routes
local router_ver
local tls_router
local other_routes = {}
local _M = {version = 0.1}
local function match_addrs(route, vars)
-- todo: use resty-ipmatcher to support multiple ip address
if route.value.remote_addr then
local ok, _ = route.value.remote_addr_matcher:match(vars.remote_addr)
if not ok then
return false
end
end
if route.value.server_addr then
local ok, _ = route.value.server_addr_matcher:match(vars.server_addr)
if not ok then
return false
end
end
-- todo: use resty-ipmatcher to support multiple ip address
if route.value.server_port and
route.value.server_port ~= tonumber(vars.server_port) then
return false
end
return true
end
local create_router
do
local sni_to_items = {}
local tls_routes = {}
function create_router(items)
local tls_routes_idx = 1
local other_routes_idx = 1
core.table.clear(tls_routes)
core.table.clear(other_routes)
core.table.clear(sni_to_items)
for _, item in config_util.iterate_values(items) do
if item.value == nil then
goto CONTINUE
end
local route = item.value
if route.protocol and route.protocol.superior_id then
-- subordinate route won't be matched in the entry
-- TODO: check the subordinate relationship in the Admin API
goto CONTINUE
end
if item.value.remote_addr then
item.value.remote_addr_matcher = core_ip.create_ip_matcher({item.value.remote_addr})
end
if item.value.server_addr then
item.value.server_addr_matcher = core_ip.create_ip_matcher({item.value.server_addr})
end
if not route.sni then
other_routes[other_routes_idx] = item
other_routes_idx = other_routes_idx + 1
goto CONTINUE
end
local sni_rev = route.sni:reverse()
local stored = sni_to_items[sni_rev]
if stored then
core.table.insert(stored, item)
goto CONTINUE
end
sni_to_items[sni_rev] = {item}
tls_routes[tls_routes_idx] = {
paths = sni_rev,
filter_fun = function (vars, opts, ctx)
local items = sni_to_items[sni_rev]
for _, route in ipairs(items) do
local hit = match_addrs(route, vars)
if hit then
ctx.matched_route = route
return true
end
end
return false
end,
handler = function (ctx, sni_rev)
-- done in the filter_fun
end
}
tls_routes_idx = tls_routes_idx + 1
::CONTINUE::
end
if #tls_routes > 0 then
local router, err = router_new(tls_routes)
if not router then
return err
end
tls_router = router
end
return nil
end
end
do
local match_opts = {}
function _M.match(api_ctx)
if router_ver ~= user_routes.conf_version then
local err = create_router(user_routes.values)
if err then
return false, "failed to create router: " .. err
end
router_ver = user_routes.conf_version
end
local sni = apisix_ssl.server_name()
if sni and tls_router then
local sni_rev = sni:reverse()
core.table.clear(match_opts)
match_opts.vars = api_ctx.var
local _, err = tls_router:dispatch(sni_rev, match_opts, api_ctx)
if err then
return false, "failed to match TLS router: " .. err
end
end
if api_ctx.matched_route then
-- unlike the matcher for the SSL, it is fine to let
-- '*.x.com' to match 'a.b.x.com' as we don't care about
-- the certificate
return true
end
for _, route in ipairs(other_routes) do
local hit = match_addrs(route, api_ctx.var)
if hit then
api_ctx.matched_route = route
return true
end
end
core.log.info("not hit any route")
return true
end
end
function _M.routes()
if not user_routes then
return nil, nil
end
return user_routes.values, user_routes.conf_version
end
local function stream_route_checker(item, in_cp)
if item.plugins then
local ok, message = stream_plugin_checker(item, in_cp)
if not ok then
return false, message
end
end
-- validate the address format when remote_address or server_address is not nil
if item.remote_addr then
if not core_ip.validate_cidr_or_ip(item.remote_addr) then
return false, "invalid remote_addr: " .. item.remote_addr
end
end
if item.server_addr then
if not core_ip.validate_cidr_or_ip(item.server_addr) then
return false, "invalid server_addr: " .. item.server_addr
end
end
if item.protocol then
local prot_conf = item.protocol
if prot_conf then
local ok, message = xrpc.check_schema(prot_conf, false)
if not ok then
return false, message
end
end
end
return true
end
_M.stream_route_checker = stream_route_checker
function _M.stream_init_worker(filter)
local err
user_routes, err = core.config.new("/stream_routes", {
automatic = true,
item_schema = core.schema.stream_route,
checker = function(item)
return stream_route_checker(item)
end,
filter = filter,
})
if not user_routes then
error("failed to create etcd instance for fetching /stream_routes : "
.. err)
end
end
return _M

View File

@@ -0,0 +1,121 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local require = require
local core = require("apisix.core")
local metrics = require("apisix.stream.xrpc.metrics")
local ipairs = ipairs
local pairs = pairs
local ngx_exit = ngx.exit
local is_http = true
local runner
if ngx.config.subsystem ~= "http" then
is_http = false
runner = require("apisix.stream.xrpc.runner")
end
local _M = {}
local registered_protocols = {}
local registered_protocol_schemas = {}
-- only need to load schema module when it is used in Admin API
local function register_protocol(name, is_http)
if not is_http then
registered_protocols[name] = require("apisix.stream.xrpc.protocols." .. name)
end
registered_protocol_schemas[name] =
require("apisix.stream.xrpc.protocols." .. name .. ".schema")
end
function _M.init()
local local_conf = core.config.local_conf()
if not local_conf.xrpc then
return
end
local prot_conf = local_conf.xrpc.protocols
if not prot_conf then
return
end
if is_http and not local_conf.apisix.enable_admin then
-- we need to register xRPC protocols in HTTP only when Admin API is enabled
return
end
for _, prot in ipairs(prot_conf) do
core.log.info("register xprc protocol ", prot.name)
register_protocol(prot.name, is_http)
end
end
function _M.init_metrics(collector)
local local_conf = core.config.local_conf()
if not local_conf.xrpc then
return
end
local prot_conf = local_conf.xrpc.protocols
if not prot_conf then
return
end
for _, prot in ipairs(prot_conf) do
metrics.store(collector, prot.name)
end
end
function _M.init_worker()
for name, prot in pairs(registered_protocols) do
if not is_http and prot.init_worker then
prot.init_worker()
end
end
end
function _M.check_schema(item, skip_disabled_plugin)
local name = item.name
local protocol = registered_protocol_schemas[name]
if not protocol and not skip_disabled_plugin then
-- like plugins, ignore unknown plugin if the schema is checked in the DP
return false, "unknown protocol [" .. name .. "]"
end
-- check protocol-specific configuration
if not item.conf then
return true
end
return protocol.check_schema(item.conf)
end
function _M.run_protocol(conf, ctx)
local name = conf.name
local protocol = registered_protocols[name]
local code = runner.run(protocol, ctx)
return ngx_exit(code)
end
return _M

View File

@@ -0,0 +1,50 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local require = require
local core = require("apisix.core")
local pairs = pairs
local pcall = pcall
local _M = {}
local hubs = {}
function _M.store(prometheus, name)
local ok, m = pcall(require, "apisix.stream.xrpc.protocols." .. name .. ".metrics")
if not ok then
core.log.notice("no metric for protocol ", name)
return
end
local hub = {}
for metric, conf in pairs(m) do
core.log.notice("register metric ", metric, " for protocol ", name)
hub[metric] = prometheus[conf.type](prometheus, name .. '_' .. metric,
conf.help, conf.labels, conf.buckets)
end
hubs[name] = hub
end
function _M.load(name)
return hubs[name]
end
return _M

View File

@@ -0,0 +1,231 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local sdk = require("apisix.stream.xrpc.sdk")
local xrpc_socket = require("resty.apisix.stream.xrpc.socket")
local math_random = math.random
local ngx = ngx
local OK = ngx.OK
local str_format = string.format
local DECLINED = ngx.DECLINED
local DONE = ngx.DONE
local bit = require("bit")
local ffi = require("ffi")
local ffi_str = ffi.string
-- dubbo protocol spec: https://cn.dubbo.apache.org/zh-cn/overview/reference/protocols/tcp/
local header_len = 16
local _M = {}
function _M.init_downstream(session)
session.req_id_seq = 0
session.resp_id_seq = 0
session.cmd_labels = { session.route.id, "" }
return xrpc_socket.downstream.socket()
end
local function parse_dubbo_header(header)
for i = 1, header_len do
local currentByte = header:byte(i)
if not currentByte then
return nil
end
end
local magic_number = str_format("%04x", header:byte(1) * 256 + header:byte(2))
local message_flag = header:byte(3)
local status = header:byte(4)
local request_id = 0
for i = 5, 12 do
request_id = request_id * 256 + header:byte(i)
end
local byte13Val = header:byte(13) * 256 * 256 * 256
local byte14Val = header:byte(14) * 256 * 256
local data_length = byte13Val + byte14Val + header:byte(15) * 256 + header:byte(16)
local is_request = bit.band(bit.rshift(message_flag, 7), 0x01) == 1 and 1 or 0
local is_two_way = bit.band(bit.rshift(message_flag, 6), 0x01) == 1 and 1 or 0
local is_event = bit.band(bit.rshift(message_flag, 5), 0x01) == 1 and 1 or 0
return {
magic_number = magic_number,
message_flag = message_flag,
is_request = is_request,
is_two_way = is_two_way,
is_event = is_event,
status = status,
request_id = request_id,
data_length = data_length
}
end
local function read_data(sk, is_req)
local header_data, err = sk:read(header_len)
if not header_data then
return nil, err, false
end
local header_str = ffi_str(header_data, header_len)
local header_info = parse_dubbo_header(header_str)
if not header_info then
return nil, "header insufficient", false
end
local is_valid_magic_number = header_info.magic_number == "dabb"
if not is_valid_magic_number then
return nil, str_format("unknown magic number: \"%s\"", header_info.magic_number), false
end
local body_data, err = sk:read(header_info.data_length)
if not body_data then
core.log.error("failed to read dubbo request body")
return nil, err, false
end
local ctx = ngx.ctx
ctx.dubbo_serialization_id = bit.band(header_info.message_flag, 0x1F)
if is_req then
ctx.dubbo_req_body_data = body_data
else
ctx.dubbo_rsp_body_data = body_data
end
return true, nil, false
end
local function read_req(sk)
return read_data(sk, true)
end
local function read_reply(sk)
return read_data(sk, false)
end
local function handle_reply(session, sk)
local ok, err = read_reply(sk)
if not ok then
return nil, err
end
local ctx = sdk.get_req_ctx(session, 10)
return ctx
end
function _M.from_downstream(session, downstream)
local read_pipeline = false
session.req_id_seq = session.req_id_seq + 1
local ctx = sdk.get_req_ctx(session, session.req_id_seq)
session._downstream_ctx = ctx
while true do
local ok, err, pipelined = read_req(downstream)
if not ok then
if err ~= "timeout" and err ~= "closed" then
core.log.error("failed to read request: ", err)
end
if read_pipeline and err == "timeout" then
break
end
return DECLINED
end
if not pipelined then
break
end
if not read_pipeline then
read_pipeline = true
-- set minimal read timeout to read pipelined data
downstream:settimeouts(0, 0, 1)
end
end
if read_pipeline then
-- set timeout back
downstream:settimeouts(0, 0, 0)
end
return OK, ctx
end
function _M.connect_upstream(session, ctx)
local conf = session.upstream_conf
local nodes = conf.nodes
if #nodes == 0 then
core.log.error("failed to connect: no nodes")
return DECLINED
end
local node = nodes[math_random(#nodes)]
local sk = sdk.connect_upstream(node, conf)
if not sk then
return DECLINED
end
core.log.debug("dubbo_connect_upstream end")
return OK, sk
end
function _M.disconnect_upstream(session, upstream)
sdk.disconnect_upstream(upstream, session.upstream_conf)
end
function _M.to_upstream(session, ctx, downstream, upstream)
local ok, _ = upstream:move(downstream)
if not ok then
return DECLINED
end
return OK
end
function _M.from_upstream(session, downstream, upstream)
local ctx,err = handle_reply(session, upstream)
if err then
return DECLINED
end
local ok, _ = downstream:move(upstream)
if not ok then
return DECLINED
end
return DONE, ctx
end
function _M.log(_, _)
end
return _M

View File

@@ -0,0 +1,32 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local schema = {
type = "object",
}
local _M = {}
function _M.check_schema(conf)
return core.schema.check(schema, conf)
end
return _M

View File

@@ -0,0 +1,222 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local ipairs = ipairs
local pairs = pairs
local cmd_to_key_finder = {}
--[[
-- the data is generated from the script below
local redis = require "resty.redis"
local red = redis:new()
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
ngx.say("failed to connect: ", err)
return
end
local res = red:command("info")
local map = {}
for _, r in ipairs(res) do
local first_key = r[4]
local last_key = r[5]
local step = r[6]
local idx = first_key .. ':' .. last_key .. ':' .. step
if idx ~= "1:1:1" then
-- "1:1:1" is the default
if map[idx] then
table.insert(map[idx], r[1])
else
map[idx] = {r[1]}
end
end
end
for _, r in pairs(map) do
table.sort(r)
end
local dump = require('pl.pretty').dump; dump(map)
--]]
local key_to_cmd = {
["0:0:0"] = {
"acl",
"asking",
"auth",
"bgrewriteaof",
"bgsave",
"blmpop",
"bzmpop",
"client",
"cluster",
"command",
"config",
"dbsize",
"debug",
"discard",
"echo",
"eval",
"eval_ro",
"evalsha",
"evalsha_ro",
"exec",
"failover",
"fcall",
"fcall_ro",
"flushall",
"flushdb",
"function",
"hello",
"info",
"keys",
"lastsave",
"latency",
"lmpop",
"lolwut",
"memory",
"module",
"monitor",
"multi",
"object",
"pfselftest",
"ping",
"psubscribe",
"psync",
"publish",
"pubsub",
"punsubscribe",
"quit",
"randomkey",
"readonly",
"readwrite",
"replconf",
"replicaof",
"reset",
"role",
"save",
"scan",
"script",
"select",
"shutdown",
"sintercard",
"slaveof",
"slowlog",
"subscribe",
"swapdb",
"sync",
"time",
"unsubscribe",
"unwatch",
"wait",
"xgroup",
"xinfo",
"xread",
"xreadgroup",
"zdiff",
"zinter",
"zintercard",
"zmpop",
"zunion"
},
["1:-1:1"] = {
"del",
"exists",
"mget",
"pfcount",
"pfmerge",
"sdiff",
"sdiffstore",
"sinter",
"sinterstore",
"ssubscribe",
"sunion",
"sunionstore",
"sunsubscribe",
"touch",
"unlink",
"watch"
},
["1:-1:2"] = {
"mset",
"msetnx"
},
["1:-2:1"] = {
"blpop",
"brpop",
"bzpopmax",
"bzpopmin"
},
["1:2:1"] = {
"blmove",
"brpoplpush",
"copy",
"geosearchstore",
"lcs",
"lmove",
"rename",
"renamenx",
"rpoplpush",
"smove",
"zrangestore"
},
["2:-1:1"] = {
"bitop"
},
["2:2:1"] = {
"pfdebug"
},
["3:3:1"] = {
"migrate"
}
}
local key_finders = {
["0:0:0"] = false,
["1:-1:1"] = function (idx, narg)
return 1 < idx
end,
["1:-1:2"] = function (idx, narg)
return 1 < idx and idx % 2 == 0
end,
["1:-2:1"] = function (idx, narg)
return 1 < idx and idx < narg - 1
end,
["1:2:1"] = function (idx, narg)
return idx == 2 or idx == 3
end,
["2:-1:1"] = function (idx, narg)
return 2 < idx
end,
["2:2:1"] = function (idx, narg)
return idx == 3
end,
["3:3:1"] = function (idx, narg)
return idx == 4
end
}
for k, cmds in pairs(key_to_cmd) do
for _, cmd in ipairs(cmds) do
cmd_to_key_finder[cmd] = key_finders[k]
end
end
return {
cmd_to_key_finder = cmd_to_key_finder,
default_key_finder = function (idx, narg)
return idx == 2
end,
}

View File

@@ -0,0 +1,499 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local sdk = require("apisix.stream.xrpc.sdk")
local commands = require("apisix.stream.xrpc.protocols.redis.commands")
local xrpc_socket = require("resty.apisix.stream.xrpc.socket")
local ffi = require("ffi")
local ffi_str = ffi.string
local math_random = math.random
local OK = ngx.OK
local DECLINED = ngx.DECLINED
local DONE = ngx.DONE
local sleep = ngx.sleep
local str_byte = string.byte
local str_fmt = string.format
local ipairs = ipairs
local tonumber = tonumber
-- this variable is only used to log the redis command line in log_format
-- and is not used for filter in the logger phase.
core.ctx.register_var("redis_cmd_line", function(ctx)
return core.table.concat(ctx.cmd_line, " ")
end)
-- redis protocol spec: https://redis.io/docs/reference/protocol-spec/
-- There is no plan to support inline command format
local protocol_name = "redis"
local _M = {}
local MAX_LINE_LEN = 128
local MAX_VALUE_LEN = 128
local PREFIX_ARR = str_byte("*")
local PREFIX_STR = str_byte("$")
local PREFIX_STA = str_byte("+")
local PREFIX_INT = str_byte(":")
local PREFIX_ERR = str_byte("-")
local lrucache = core.lrucache.new({
type = "plugin",
})
local function create_matcher(conf)
local matcher = {}
--[[
{"delay": 5, "key":"x", "commands":["GET", "MGET"]}
{"delay": 5, "commands":["GET"]}
=> {
get = {keys = {x = {delay = 5}, * = {delay = 5}}}
mget = {keys = {x = {delay = 5}}}
}
]]--
for _, rule in ipairs(conf.faults) do
for _, cmd in ipairs(rule.commands) do
cmd = cmd:lower()
local key = rule.key
local kf = commands.cmd_to_key_finder[cmd]
local key_matcher = matcher[cmd]
if not key_matcher then
key_matcher = {
keys = {}
}
matcher[cmd] = key_matcher
end
if not key or kf == false then
key = "*"
end
if key_matcher.keys[key] then
core.log.warn("override existent fault rule of cmd: ", cmd, ", key: ", key)
end
key_matcher.keys[key] = rule
end
end
return matcher
end
local function get_matcher(conf, ctx)
return core.lrucache.plugin_ctx(lrucache, ctx, nil, create_matcher, conf)
end
function _M.init_downstream(session)
local conf = session.route.protocol.conf
if conf and conf.faults then
local matcher = get_matcher(conf, session.conn_ctx)
session.matcher = matcher
end
session.req_id_seq = 0
session.resp_id_seq = 0
session.cmd_labels = {session.route.id, ""}
return xrpc_socket.downstream.socket()
end
local function read_line(sk)
local p, err, len = sk:read_line(MAX_LINE_LEN)
if not p then
return nil, err
end
if len < 2 then
return nil, "line too short"
end
return p, nil, len
end
local function read_len(sk)
local p, err, len = read_line(sk)
if not p then
return nil, err
end
local s = ffi_str(p + 1, len - 1)
local n = tonumber(s)
if not n then
return nil, str_fmt("invalid len string: \"%s\"", s)
end
return n
end
local function read_req(session, sk)
local narg, err = read_len(sk)
if not narg then
return nil, err
end
local cmd_line = core.tablepool.fetch("xrpc_redis_cmd_line", narg, 0)
local n, err = read_len(sk)
if not n then
return nil, err
end
local p, err = sk:read(n + 2)
if not p then
return nil, err
end
local s = ffi_str(p, n)
local cmd = s:lower()
cmd_line[1] = cmd
if cmd == "subscribe" or cmd == "psubscribe" then
session.in_pub_sub = true
end
local key_finder
local matcher = session.matcher
if matcher then
matcher = matcher[s:lower()]
if matcher then
key_finder = commands.cmd_to_key_finder[s] or commands.default_key_finder
end
end
for i = 2, narg do
local is_key = false
if key_finder then
is_key = key_finder(i, narg)
end
local n, err = read_len(sk)
if not n then
return nil, err
end
local s
if not is_key and n > MAX_VALUE_LEN then
-- avoid recording big value
local p, err = sk:read(MAX_VALUE_LEN)
if not p then
return nil, err
end
local ok, err = sk:drain(n - MAX_VALUE_LEN + 2)
if not ok then
return nil, err
end
s = ffi_str(p, MAX_VALUE_LEN) .. "...(" .. n .. " bytes)"
else
local p, err = sk:read(n + 2)
if not p then
return nil, err
end
s = ffi_str(p, n)
if is_key and matcher.keys[s] then
matcher = matcher.keys[s]
key_finder = nil
end
end
cmd_line[i] = s
end
session.req_id_seq = session.req_id_seq + 1
local ctx = sdk.get_req_ctx(session, session.req_id_seq)
ctx.cmd_line = cmd_line
ctx.cmd = cmd
local pipelined = sk:has_pending_data()
if matcher then
if matcher.keys then
-- try to match any key of this command
matcher = matcher.keys["*"]
end
if matcher then
sleep(matcher.delay)
end
end
return true, nil, pipelined
end
local function read_subscribe_reply(sk)
local line, err, n = read_line(sk)
if not line then
return nil, err
end
local prefix = line[0]
if prefix == PREFIX_STR then -- char '$'
local size = tonumber(ffi_str(line + 1, n - 1))
if size < 0 then
return true
end
local p, err = sk:read(size + 2)
if not p then
return nil, err
end
return ffi_str(p, size)
elseif prefix == PREFIX_INT then -- char ':'
return tonumber(ffi_str(line + 1, n - 1))
else
return nil, str_fmt("unknown prefix: \"%s\"", prefix)
end
end
local function read_reply(sk, session)
local line, err, n = read_line(sk)
if not line then
return nil, err
end
local prefix = line[0]
if prefix == PREFIX_STR then -- char '$'
-- print("bulk reply")
local size = tonumber(ffi_str(line + 1, n - 1))
if size < 0 then
return true
end
local ok, err = sk:drain(size + 2)
if not ok then
return nil, err
end
return true
elseif prefix == PREFIX_STA then -- char '+'
-- print("status reply")
return true
elseif prefix == PREFIX_ARR then -- char '*'
local narr = tonumber(ffi_str(line + 1, n - 1))
-- print("multi-bulk reply: ", narr)
if narr < 0 then
return true
end
if session and session.in_pub_sub and (narr == 3 or narr == 4) then
local msg_type, err = read_subscribe_reply(sk)
if msg_type == nil then
return nil, err
end
session.pub_sub_msg_type = msg_type
local res, err = read_reply(sk)
if res == nil then
return nil, err
end
if msg_type == "unsubscribe" or msg_type == "punsubscribe" then
local n_ch, err = read_subscribe_reply(sk)
if n_ch == nil then
return nil, err
end
if n_ch == 0 then
session.in_pub_sub = -1
-- clear this flag later at the end of `handle_reply`
end
else
local n = msg_type == "pmessage" and 2 or 1
for i = 1, n do
local res, err = read_reply(sk)
if res == nil then
return nil, err
end
end
end
else
for i = 1, narr do
local res, err = read_reply(sk)
if res == nil then
return nil, err
end
end
end
return true
elseif prefix == PREFIX_INT then -- char ':'
-- print("integer reply")
return true
elseif prefix == PREFIX_ERR then -- char '-'
-- print("error reply: ", n)
return true
else
return nil, str_fmt("unknown prefix: \"%s\"", prefix)
end
end
local function handle_reply(session, sk)
local ok, err = read_reply(sk, session)
if not ok then
return nil, err
end
local ctx
if session.in_pub_sub and session.pub_sub_msg_type then
local msg_type = session.pub_sub_msg_type
session.pub_sub_msg_type = nil
if session.resp_id_seq < session.req_id_seq then
local cur_ctx = sdk.get_req_ctx(session, session.resp_id_seq + 1)
local cmd = cur_ctx.cmd
if cmd == msg_type then
ctx = cur_ctx
session.resp_id_seq = session.resp_id_seq + 1
end
end
if session.in_pub_sub == -1 then
session.in_pub_sub = nil
end
else
session.resp_id_seq = session.resp_id_seq + 1
ctx = sdk.get_req_ctx(session, session.resp_id_seq)
end
return ctx
end
function _M.from_downstream(session, downstream)
local read_pipeline = false
while true do
local ok, err, pipelined = read_req(session, downstream)
if not ok then
if err ~= "timeout" and err ~= "closed" then
core.log.error("failed to read request: ", err)
end
if read_pipeline and err == "timeout" then
break
end
return DECLINED
end
if not pipelined then
break
end
if not read_pipeline then
read_pipeline = true
-- set minimal read timeout to read pipelined data
downstream:settimeouts(0, 0, 1)
end
end
if read_pipeline then
-- set timeout back
downstream:settimeouts(0, 0, 0)
end
return OK
end
function _M.connect_upstream(session, ctx)
local conf = session.upstream_conf
local nodes = conf.nodes
if #nodes == 0 then
core.log.error("failed to connect: no nodes")
return DECLINED
end
local node = nodes[math_random(#nodes)]
local sk = sdk.connect_upstream(node, conf)
if not sk then
return DECLINED
end
return OK, sk
end
function _M.disconnect_upstream(session, upstream)
sdk.disconnect_upstream(upstream, session.upstream_conf)
end
function _M.to_upstream(session, ctx, downstream, upstream)
local ok, err = upstream:move(downstream)
if not ok then
core.log.error("failed to send to upstream: ", err)
return DECLINED
end
return OK
end
function _M.from_upstream(session, downstream, upstream)
local ctx, err = handle_reply(session, upstream)
if err then
core.log.error("failed to handle upstream: ", err)
return DECLINED
end
local ok, err = downstream:move(upstream)
if not ok then
core.log.error("failed to handle upstream: ", err)
return DECLINED
end
return DONE, ctx
end
function _M.log(session, ctx)
local metrics = sdk.get_metrics(session, protocol_name)
if metrics then
session.cmd_labels[2] = ctx.cmd
metrics.commands_total:inc(1, session.cmd_labels)
metrics.commands_latency_seconds:observe(ctx.var.rpc_time, session.cmd_labels)
end
core.tablepool.release("xrpc_redis_cmd_line", ctx.cmd_line)
ctx.cmd_line = nil
end
return _M

View File

@@ -0,0 +1,33 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local _M = {
commands_total = {
type = "counter",
help = "Total number of requests for a specific Redis command",
labels = {"route", "command"},
},
commands_latency_seconds = {
type = "histogram",
help = "Latency of requests for a specific Redis command",
labels = {"route", "command"},
-- latency buckets, 1ms to 1s:
buckets = {0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1}
},
}
return _M

View File

@@ -0,0 +1,59 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local schema = {
type = "object",
properties = {
faults = {
type = "array",
minItems = 1,
items = {
type = "object",
properties = {
commands = {
type = "array",
minItems = 1,
items = {
type = "string"
},
},
key = {
type = "string",
minLength = 1,
},
delay = {
type = "number",
description = "additional delay in seconds",
}
},
required = {"commands", "delay"}
},
},
},
}
local _M = {}
function _M.check_schema(conf)
return core.schema.check(schema, conf)
end
return _M

View File

@@ -0,0 +1,279 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local require = require
local core = require("apisix.core")
local expr = require("resty.expr.v1")
local pairs = pairs
local ngx = ngx
local ngx_now = ngx.now
local OK = ngx.OK
local DECLINED = ngx.DECLINED
local DONE = ngx.DONE
local pcall = pcall
local ipairs = ipairs
local tostring = tostring
core.ctx.register_var("rpc_time", function(ctx)
return ctx._rpc_end_time - ctx._rpc_start_time
end)
local logger_expr_cache = core.lrucache.new({
ttl = 300, count = 1024
})
local _M = {}
local function open_session(conn_ctx)
conn_ctx.xrpc_session = {
conn_ctx = conn_ctx,
route = conn_ctx.matched_route.value,
-- fields start with '_' should not be accessed by the protocol implementation
_upstream_conf = conn_ctx.matched_upstream,
_ctxs = {},
}
return conn_ctx.xrpc_session
end
local function put_req_ctx(session, ctx)
local id = ctx._id
session._ctxs[id] = nil
core.ctx.release_vars(ctx)
core.tablepool.release("xrpc_ctxs", ctx)
end
local function filter_logger(ctx, logger)
if not logger then
return false
end
if not logger.filter or #logger.filter == 0 then
-- no valid filter, default execution plugin
return true
end
local version = tostring(logger.filter)
local filter_expr, err = logger_expr_cache(ctx.conf_id, version, expr.new, logger.filter)
if not filter_expr or err then
core.log.error("failed to validate the 'filter' expression: ", err)
return false
end
return filter_expr:eval(ctx.var)
end
local function run_log_plugin(ctx, logger)
local pkg_name = "apisix.stream.plugins." .. logger.name
local ok, plugin = pcall(require, pkg_name)
if not ok then
core.log.error("failed to load plugin [", logger.name, "] err: ", plugin)
return
end
local log_func = plugin.log
if log_func then
log_func(logger.conf, ctx)
end
end
local function finialize_req(protocol, session, ctx)
ctx._rpc_end_time = ngx_now()
local loggers = session.route.protocol.logger
if loggers and #loggers > 0 then
for _, logger in ipairs(loggers) do
ctx.conf_id = tostring(logger.conf)
local matched = filter_logger(ctx, logger)
core.log.info("log filter: ", logger.name, " filter result: ", matched)
if matched then
run_log_plugin(ctx, logger)
end
end
end
protocol.log(session, ctx)
put_req_ctx(session, ctx)
end
local function close_session(session, protocol)
local upstream_ctx = session._upstream_ctx
if upstream_ctx then
upstream_ctx.closed = true
local up = upstream_ctx.upstream
protocol.disconnect_upstream(session, up)
end
local upstream_ctxs = session._upstream_ctxs
if upstream_ctxs then
for _, upstream_ctx in pairs(upstream_ctxs) do
upstream_ctx.closed = true
local up = upstream_ctx.upstream
protocol.disconnect_upstream(session, up)
end
end
for id, ctx in pairs(session._ctxs) do
core.log.notice("RPC is not finished, id: ", id)
ctx.unfinished = true
finialize_req(protocol, session, ctx)
end
end
local function open_upstream(protocol, session, ctx)
local key = session._upstream_key
session._upstream_key = nil
if key then
if not session._upstream_ctxs then
session._upstream_ctxs = {}
end
local up_ctx = session._upstream_ctxs[key]
if up_ctx then
return OK, up_ctx
end
else
if session._upstream_ctx then
return OK, session._upstream_ctx
end
session.upstream_conf = session._upstream_conf
end
local state, upstream = protocol.connect_upstream(session, session)
if state ~= OK then
return state, nil
end
local up_ctx = {
upstream = upstream,
closed = false,
}
if key then
session._upstream_ctxs[key] = up_ctx
else
session._upstream_ctx = up_ctx
end
return OK, up_ctx
end
local function start_upstream_coroutine(session, protocol, downstream, up_ctx)
local upstream = up_ctx.upstream
while not up_ctx.closed do
local status, ctx = protocol.from_upstream(session, downstream, upstream)
if status ~= OK then
if ctx ~= nil then
finialize_req(protocol, session, ctx)
end
if status == DECLINED then
-- fail to read
break
end
if status == DONE then
-- a rpc is finished
goto continue
end
end
::continue::
end
end
function _M.run(protocol, conn_ctx)
local session = open_session(conn_ctx)
local downstream = protocol.init_downstream(session)
while true do
local status, ctx = protocol.from_downstream(session, downstream)
if status ~= OK then
if ctx ~= nil then
finialize_req(protocol, session, ctx)
end
if status == DECLINED then
-- fail to read or can't be authorized
break
end
if status == DONE then
-- heartbeat or fault injection, already reply to downstream
goto continue
end
end
-- need to do some auth/routing jobs before reaching upstream
local status, up_ctx = open_upstream(protocol, session, ctx)
if status ~= OK then
if ctx ~= nil then
finialize_req(protocol, session, ctx)
end
break
end
status = protocol.to_upstream(session, ctx, downstream, up_ctx.upstream)
if status ~= OK then
if ctx ~= nil then
finialize_req(protocol, session, ctx)
end
if status == DECLINED then
break
end
if status == DONE then
-- for Unary request we can directly reply here
goto continue
end
end
if not up_ctx.coroutine then
local co, err = ngx.thread.spawn(
start_upstream_coroutine, session, protocol, downstream, up_ctx)
if not co then
core.log.error("failed to start upstream coroutine: ", err)
break
end
up_ctx.coroutine = co
end
::continue::
end
close_session(session, protocol)
-- return non-zero code to terminal the session
return 200
end
return _M

View File

@@ -0,0 +1,202 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
--- Upstream helper functions which can be used in xRPC
--
-- @module xrpc.sdk
local core = require("apisix.core")
local config_util = require("apisix.core.config_util")
local router = require("apisix.stream.router.ip_port")
local metrics = require("apisix.stream.xrpc.metrics")
local apisix_upstream = require("apisix.upstream")
local xrpc_socket = require("resty.apisix.stream.xrpc.socket")
local ngx_now = ngx.now
local str_fmt = string.format
local tab_insert = table.insert
local error = error
local tostring = tostring
local _M = {}
---
-- Returns the connected xRPC upstream socket according to the configuration
--
-- @function xrpc.sdk.connect_upstream
-- @tparam table node selected upstream node
-- @tparam table up_conf upstream configuration
-- @treturn table|nil the xRPC upstream socket, or nil if failed
function _M.connect_upstream(node, up_conf)
local sk = xrpc_socket.upstream.socket()
local timeout = up_conf.timeout
if not timeout then
-- use the default timeout of Nginx proxy
sk:settimeouts(60 * 1000, 600 * 1000, 600 * 1000)
else
-- the timeout unit for balancer is second while the unit for cosocket is millisecond
sk:settimeouts(timeout.connect * 1000, timeout.send * 1000, timeout.read * 1000)
end
local ok, err = sk:connect(node.host, node.port)
if not ok then
core.log.error("failed to connect: ", err)
return nil
end
if up_conf.scheme == "tls" then
-- TODO: support mTLS
local ok, err = sk:sslhandshake(nil, node.host)
if not ok then
core.log.error("failed to handshake: ", err)
return nil
end
end
return sk
end
---
-- Disconnect xRPC upstream socket according to the configuration
--
-- @function xrpc.sdk.disconnect_upstream
-- @tparam table upstream xRPC upstream socket
-- @tparam table up_conf upstream configuration
function _M.disconnect_upstream(upstream, up_conf)
return upstream:close()
end
---
-- Returns the request level ctx with an id
--
-- @function xrpc.sdk.get_req_ctx
-- @tparam table session xrpc session
-- @tparam string id ctx id
-- @treturn table the request level ctx
function _M.get_req_ctx(session, id)
if not id then
error("id is required")
end
local ctx = session._ctxs[id]
if ctx then
return ctx
end
local ctx = core.tablepool.fetch("xrpc_ctxs", 4, 4)
-- fields start with '_' should not be accessed by the protocol implementation
ctx._id = id
core.ctx.set_vars_meta(ctx)
ctx.conf_type = "xrpc-" .. session.route.protocol.name .. "-logger"
session._ctxs[id] = ctx
ctx._rpc_start_time = ngx_now()
return ctx
end
---
-- Returns the new router if the stream routes are changed
--
-- @function xrpc.sdk.get_router
-- @tparam table session xrpc session
-- @tparam string version the current router version, should come from the last call
-- @treturn boolean whether there is a change
-- @treturn table the new router under the specific protocol
-- @treturn string the new router version
function _M.get_router(session, version)
local protocol_name = session.route.protocol.name
local id = session.route.id
local items, conf_version = router.routes()
if version == conf_version then
return false
end
local proto_router = {}
for _, item in config_util.iterate_values(items) do
if item.value == nil then
goto CONTINUE
end
local route = item.value
if route.protocol.name ~= protocol_name then
goto CONTINUE
end
if tostring(route.protocol.superior_id) ~= id then
goto CONTINUE
end
tab_insert(proto_router, route)
::CONTINUE::
end
return true, proto_router, conf_version
end
---
-- Set the session's current upstream according to the route's configuration
--
-- @function xrpc.sdk.set_upstream
-- @tparam table session xrpc session
-- @tparam table conf the route configuration
-- @treturn nil|string error message if present
function _M.set_upstream(session, conf)
local up
if conf.upstream then
up = conf.upstream
else
local id = conf.upstream_id
up = apisix_upstream.get_by_id(id)
if not up then
return str_fmt("upstream %s can't be got", id)
end
end
local key = tostring(up)
core.log.info("set upstream to: ", key, " conf: ", core.json.delay_encode(up, true))
session._upstream_key = key
session.upstream_conf = up
return nil
end
---
-- Returns the protocol specific metrics object
--
-- @function xrpc.sdk.get_metrics
-- @tparam table session xrpc session
-- @tparam string protocol_name protocol name
-- @treturn nil|table the metrics under the specific protocol if available
function _M.get_metrics(session, protocol_name)
local metric_conf = session.route.protocol.metric
if not (metric_conf and metric_conf.enable) then
return nil
end
return metrics.load(protocol_name)
end
return _M