-- -- Licensed to the Apache Software Foundation (ASF) under one or more -- contributor license agreements. See the NOTICE file distributed with -- this work for additional information regarding copyright ownership. -- The ASF licenses this file to You under the Apache License, Version 2.0 -- (the "License"); you may not use this file except in compliance with -- the License. You may obtain a copy of the License at -- -- http://www.apache.org/licenses/LICENSE-2.0 -- -- Unless required by applicable law or agreed to in writing, software -- distributed under the License is distributed on an "AS IS" BASIS, -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- See the License for the specific language governing permissions and -- limitations under the License. -- local require = require -- set the JIT options before any code, to prevent error "changing jit stack size is not -- allowed when some regexs have already been compiled and cached" if require("ffi").os == "Linux" then require("ngx.re").opt("jit_stack_size", 200 * 1024) end require("jit.opt").start("minstitch=2", "maxtrace=4000", "maxrecord=8000", "sizemcode=64", "maxmcode=4000", "maxirconst=1000") require("apisix.patch").patch() local core = require("apisix.core") local plugin = require("apisix.plugin") local plugin_config = require("apisix.plugin_config") local consumer_group = require("apisix.consumer_group") local script = require("apisix.script") local service_fetch = require("apisix.http.service").get local admin_init = require("apisix.admin.init") local get_var = require("resty.ngxvar").fetch local router = require("apisix.router") local apisix_upstream = require("apisix.upstream") local apisix_secret = require("apisix.secret") local set_upstream = apisix_upstream.set_by_route local apisix_ssl = require("apisix.ssl") local apisix_global_rules = require("apisix.global_rules") local upstream_util = require("apisix.utils.upstream") local xrpc = require("apisix.stream.xrpc") local ctxdump = require("resty.ctxdump") local debug = require("apisix.debug") local pubsub_kafka = require("apisix.pubsub.kafka") local ngx = ngx local get_method = ngx.req.get_method local ngx_exit = ngx.exit local math = math local ipairs = ipairs local ngx_now = ngx.now local ngx_var = ngx.var local re_split = require("ngx.re").split local str_byte = string.byte local str_sub = string.sub local tonumber = tonumber local type = type local pairs = pairs local tostring = tostring local ngx_re_match = ngx.re.match local control_api_router local is_http = false if ngx.config.subsystem == "http" then is_http = true control_api_router = require("apisix.control.router") end local ok, apisix_base_flags = pcall(require, "resty.apisix.patch") if not ok then apisix_base_flags = {} end local load_balancer local local_conf local ver_header = "APISIX/" .. core.version.VERSION local has_mod, apisix_ngx_client = pcall(require, "resty.apisix.client") local _M = {version = 0.4} function _M.http_init(args) core.resolver.init_resolver(args) core.id.init() core.env.init() local process = require("ngx.process") local ok, err = process.enable_privileged_agent() if not ok then core.log.error("failed to enable privileged_agent: ", err) end if core.config.init then local ok, err = core.config.init() if not ok then core.log.error("failed to load the configuration: ", err) end end xrpc.init() end function _M.http_init_worker() local seed, err = core.utils.get_seed_from_urandom() if not seed then core.log.warn('failed to get seed from urandom: ', err) seed = ngx_now() * 1000 + ngx.worker.pid() end math.randomseed(seed) -- for testing only core.log.info("random test in [1, 10000]: ", math.random(1, 10000)) require("apisix.events").init_worker() local discovery = require("apisix.discovery.init").discovery if discovery and discovery.init_worker then discovery.init_worker() end require("apisix.balancer").init_worker() load_balancer = require("apisix.balancer") require("apisix.admin.init").init_worker() require("apisix.timers").init_worker() require("apisix.debug").init_worker() if core.config.init_worker then local ok, err = core.config.init_worker() if not ok then core.log.error("failed to init worker process of ", core.config.type, " config center, err: ", err) end end plugin.init_worker() router.http_init_worker() require("apisix.http.service").init_worker() plugin_config.init_worker() require("apisix.consumer").init_worker() consumer_group.init_worker() apisix_secret.init_worker() apisix_global_rules.init_worker() apisix_upstream.init_worker() require("apisix.plugins.ext-plugin.init").init_worker() control_api_router.init_worker() local_conf = core.config.local_conf() if local_conf.apisix and local_conf.apisix.enable_server_tokens == false then ver_header = "APISIX" end end function _M.http_exit_worker() -- TODO: we can support stream plugin later - currently there is not `destroy` method -- in stream plugins plugin.exit_worker() require("apisix.plugins.ext-plugin.init").exit_worker() end function _M.ssl_phase() local ok, err = router.router_ssl.set(ngx.ctx.matched_ssl) if not ok then if err then core.log.error("failed to fetch ssl config: ", err) end ngx_exit(-1) end end function _M.ssl_client_hello_phase() local sni, err = apisix_ssl.server_name(true) if not sni or type(sni) ~= "string" then local advise = "please check if the client requests via IP or uses an outdated " .. "protocol. If you need to report an issue, " .. "provide a packet capture file of the TLS handshake." core.log.error("failed to find SNI: " .. (err or advise)) ngx_exit(-1) end local tls_ext_status_req = apisix_ssl.get_status_request_ext() local ngx_ctx = ngx.ctx local api_ctx = core.tablepool.fetch("api_ctx", 0, 32) ngx_ctx.api_ctx = api_ctx local ok, err = router.router_ssl.match_and_set(api_ctx, true, sni) ngx_ctx.matched_ssl = api_ctx.matched_ssl core.tablepool.release("api_ctx", api_ctx) ngx_ctx.api_ctx = nil ngx_ctx.tls_ext_status_req = tls_ext_status_req if not ok then if err then core.log.error("failed to fetch ssl config: ", err) end core.log.error("failed to match any SSL certificate by SNI: ", sni) ngx_exit(-1) end ok, err = apisix_ssl.set_protocols_by_clienthello(ngx_ctx.matched_ssl.value.ssl_protocols) if not ok then core.log.error("failed to set ssl protocols: ", err) ngx_exit(-1) end -- in stream subsystem, ngx.ssl.server_name() return hostname of ssl session in preread phase, -- so that we can't get real SNI without recording it in ngx.ctx during client_hello phase ngx.ctx.client_hello_sni = sni end local function stash_ngx_ctx() local ref = ctxdump.stash_ngx_ctx() core.log.info("stash ngx ctx: ", ref) ngx_var.ctx_ref = ref end local function fetch_ctx() local ref = ngx_var.ctx_ref core.log.info("fetch ngx ctx: ", ref) local ctx = ctxdump.apply_ngx_ctx(ref) ngx_var.ctx_ref = '' return ctx end local function parse_domain_in_route(route) local nodes = route.value.upstream.nodes local new_nodes, err = upstream_util.parse_domain_for_nodes(nodes) if not new_nodes then return nil, err end local up_conf = route.dns_value and route.dns_value.upstream local ok = upstream_util.compare_upstream_node(up_conf, new_nodes) if ok then return route end -- don't modify the modifiedIndex to avoid plugin cache miss because of DNS resolve result -- has changed route.dns_value = core.table.deepcopy(route.value, { shallows = { "self.upstream.parent"}}) route.dns_value.upstream.nodes = new_nodes core.log.info("parse route which contain domain: ", core.json.delay_encode(route, true)) return route end local function set_upstream_host(api_ctx, picked_server) local up_conf = api_ctx.upstream_conf if up_conf.pass_host then api_ctx.pass_host = up_conf.pass_host api_ctx.upstream_host = up_conf.upstream_host end local pass_host = api_ctx.pass_host or "pass" if pass_host == "pass" then return end if pass_host == "rewrite" then api_ctx.var.upstream_host = api_ctx.upstream_host return end api_ctx.var.upstream_host = picked_server.upstream_host end local function set_upstream_headers(api_ctx, picked_server) set_upstream_host(api_ctx, picked_server) local proto = api_ctx.var.http_x_forwarded_proto if proto then api_ctx.var.var_x_forwarded_proto = proto end local x_forwarded_host = api_ctx.var.http_x_forwarded_host if x_forwarded_host then api_ctx.var.var_x_forwarded_host = x_forwarded_host end local port = api_ctx.var.http_x_forwarded_port if port then api_ctx.var.var_x_forwarded_port = port end end -- verify the TLS session resumption by checking if the SNI in the client hello -- matches the hostname of the SSL session, this is to prevent the mTLS bypass security issue. local function verify_tls_session_resumption() local session_hostname, err = apisix_ssl.session_hostname() if err then core.log.error("failed to get session hostname: ", err) return false end if session_hostname and session_hostname ~= ngx.ctx.client_hello_sni then core.log.error("sni in client hello mismatch hostname of ssl session, ", "sni: ", ngx.ctx.client_hello_sni, ", hostname: ", session_hostname) return false end return true end local function verify_tls_client(ctx) local matched = router.router_ssl.match_and_set(ctx, true) if not matched then return true end local matched_ssl = ctx.matched_ssl if matched_ssl.value.client and apisix_ssl.support_client_verification() then local res = ngx_var.ssl_client_verify if res ~= "SUCCESS" then if res == "NONE" then core.log.error("client certificate was not present") else core.log.error("client certificate verification is not passed: ", res) end return false end if not verify_tls_session_resumption() then return false end end return true end local function uri_matches_skip_mtls_route_patterns(ssl, uri) for _, pat in ipairs(ssl.value.client.skip_mtls_uri_regex) do if ngx_re_match(uri, pat, "jo") then return true end end end local function verify_https_client(ctx) local scheme = ctx.var.scheme if scheme ~= "https" then return true end local matched_ssl = ngx.ctx.matched_ssl if matched_ssl.value.client and matched_ssl.value.client.skip_mtls_uri_regex and apisix_ssl.support_client_verification() and (not uri_matches_skip_mtls_route_patterns(matched_ssl, ngx.var.uri)) then local res = ctx.var.ssl_client_verify if res ~= "SUCCESS" then if res == "NONE" then core.log.error("client certificate was not present") else core.log.error("client certificate verification is not passed: ", res) end return false end end local host = ctx.var.host local matched = router.router_ssl.match_and_set(ctx, true, host) if not matched then return true end local matched_ssl = ctx.matched_ssl if matched_ssl.value.client and apisix_ssl.support_client_verification() then local verified = apisix_base_flags.client_cert_verified_in_handshake if not verified then -- vanilla OpenResty requires to check the verification result local res = ctx.var.ssl_client_verify if res ~= "SUCCESS" then if res == "NONE" then core.log.error("client certificate was not present") else core.log.error("client certificate verification is not passed: ", res) end return false end end local sni = apisix_ssl.server_name() if sni ~= host then -- There is a case that the user configures a SSL object with `*.domain`, -- and the client accesses with SNI `a.domain` but uses Host `b.domain`. -- This case is complex and we choose to restrict the access until there -- is a stronge demand in real world. core.log.error("client certificate verified with SNI ", sni, ", but the host is ", host) return false end if not verify_tls_session_resumption() then return false end end return true end local function normalize_uri_like_servlet(uri) local found = core.string.find(uri, ';') if not found then return uri end local segs, err = re_split(uri, "/", "jo") if not segs then return nil, err end local len = #segs for i = 1, len do local seg = segs[i] local pos = core.string.find(seg, ';') if pos then seg = seg:sub(1, pos - 1) -- reject bad uri which bypasses with ';' if seg == "." or seg == ".." then return nil, "dot segment with parameter" end if seg == "" and i < len then return nil, "empty segment with parameters" end segs[i] = seg seg = seg:lower() if seg == "%2e" or seg == "%2e%2e" then return nil, "encoded dot segment" end end end return core.table.concat(segs, '/') end local function common_phase(phase_name) local api_ctx = ngx.ctx.api_ctx if not api_ctx then return end plugin.run_global_rules(api_ctx, api_ctx.global_rules, phase_name) if api_ctx.script_obj then script.run(phase_name, api_ctx) return api_ctx, true end return plugin.run_plugin(phase_name, nil, api_ctx) end function _M.handle_upstream(api_ctx, route, enable_websocket) -- some plugins(ai-proxy...) request upstream by http client directly if api_ctx.bypass_nginx_upstream then common_phase("before_proxy") return end local up_id = route.value.upstream_id -- used for the traffic-split plugin if api_ctx.upstream_id then up_id = api_ctx.upstream_id end if up_id then local upstream = apisix_upstream.get_by_id(up_id) if not upstream then if is_http then return core.response.exit(502) end return ngx_exit(1) end api_ctx.matched_upstream = upstream else if route.has_domain then local err route, err = parse_domain_in_route(route) if err then core.log.error("failed to get resolved route: ", err) return core.response.exit(500) end api_ctx.conf_version = route.modifiedIndex api_ctx.matched_route = route end local route_val = route.value api_ctx.matched_upstream = (route.dns_value and route.dns_value.upstream) or route_val.upstream end if api_ctx.matched_upstream and api_ctx.matched_upstream.tls and api_ctx.matched_upstream.tls.client_cert_id then local cert_id = api_ctx.matched_upstream.tls.client_cert_id local upstream_ssl = router.router_ssl.get_by_id(cert_id) if not upstream_ssl or upstream_ssl.type ~= "client" then local err = upstream_ssl and "ssl type should be 'client'" or "ssl id [" .. cert_id .. "] not exits" core.log.error("failed to get ssl cert: ", err) if is_http then return core.response.exit(502) end return ngx_exit(1) end core.log.info("matched ssl: ", core.json.delay_encode(upstream_ssl, true)) api_ctx.upstream_ssl = upstream_ssl end if enable_websocket then api_ctx.var.upstream_upgrade = api_ctx.var.http_upgrade api_ctx.var.upstream_connection = api_ctx.var.http_connection core.log.info("enabled websocket for route: ", route.value.id) end -- load balancer is not required by kafka upstream, so the upstream -- node selection process is intercepted and left to kafka to -- handle on its own if api_ctx.matched_upstream and api_ctx.matched_upstream.scheme == "kafka" then return pubsub_kafka.access(api_ctx) end local code, err = set_upstream(route, api_ctx) if code then core.log.error("failed to set upstream: ", err) core.response.exit(code) end local server, err = load_balancer.pick_server(route, api_ctx) if not server then core.log.error("failed to pick server: ", err) return core.response.exit(502) end api_ctx.picked_server = server set_upstream_headers(api_ctx, server) -- run the before_proxy method in access phase first to avoid always reinit request common_phase("before_proxy") local up_scheme = api_ctx.upstream_scheme if up_scheme == "grpcs" or up_scheme == "grpc" then stash_ngx_ctx() return ngx.exec("@grpc_pass") end if api_ctx.dubbo_proxy_enabled then stash_ngx_ctx() return ngx.exec("@dubbo_pass") end end function _M.http_access_phase() -- from HTTP/3 to HTTP/1.1 we need to convert :authority pesudo-header -- to Host header, so we set upstream_host variable here. if ngx.req.http_version() == 3 then ngx.var.upstream_host = ngx.var.host .. ":" .. ngx.var.server_port end local ngx_ctx = ngx.ctx -- always fetch table from the table pool, we don't need a reused api_ctx local api_ctx = core.tablepool.fetch("api_ctx", 0, 32) ngx_ctx.api_ctx = api_ctx core.ctx.set_vars_meta(api_ctx) if not verify_https_client(api_ctx) then return core.response.exit(400) end debug.dynamic_debug(api_ctx) local uri = api_ctx.var.uri if local_conf.apisix then if local_conf.apisix.delete_uri_tail_slash then if str_byte(uri, #uri) == str_byte("/") then api_ctx.var.uri = str_sub(api_ctx.var.uri, 1, #uri - 1) core.log.info("remove the end of uri '/', current uri: ", api_ctx.var.uri) end end if local_conf.apisix.normalize_uri_like_servlet then local new_uri, err = normalize_uri_like_servlet(uri) if not new_uri then core.log.error("failed to normalize: ", err) return core.response.exit(400) end api_ctx.var.uri = new_uri -- forward the original uri so the servlet upstream -- can consume the param after ';' api_ctx.var.upstream_uri = uri end end -- To prevent being hacked by untrusted request_uri, here we -- record the normalized but not rewritten uri as request_uri, -- the original request_uri can be accessed via var.real_request_uri api_ctx.var.real_request_uri = api_ctx.var.request_uri api_ctx.var.request_uri = api_ctx.var.uri .. api_ctx.var.is_args .. (api_ctx.var.args or "") router.router_http.match(api_ctx) local route = api_ctx.matched_route if not route then -- run global rule when there is no matching route local global_rules = apisix_global_rules.global_rules() plugin.run_global_rules(api_ctx, global_rules, nil) core.log.info("not find any matched route") return core.response.exit(404, {error_msg = "404 Route Not Found"}) end core.log.info("matched route: ", core.json.delay_encode(api_ctx.matched_route, true)) local enable_websocket = route.value.enable_websocket if route.value.plugin_config_id then local conf = plugin_config.get(route.value.plugin_config_id) if not conf then core.log.error("failed to fetch plugin config by ", "id: ", route.value.plugin_config_id) return core.response.exit(503) end route = plugin_config.merge(route, conf) end if route.value.service_id then local service = service_fetch(route.value.service_id) if not service then core.log.error("failed to fetch service configuration by ", "id: ", route.value.service_id) return core.response.exit(404) end route = plugin.merge_service_route(service, route) api_ctx.matched_route = route api_ctx.conf_type = "route&service" api_ctx.conf_version = route.modifiedIndex .. "&" .. service.modifiedIndex api_ctx.conf_id = route.value.id .. "&" .. service.value.id api_ctx.service_id = service.value.id api_ctx.service_name = service.value.name if enable_websocket == nil then enable_websocket = service.value.enable_websocket end else api_ctx.conf_type = "route" api_ctx.conf_version = route.modifiedIndex api_ctx.conf_id = route.value.id end api_ctx.route_id = route.value.id api_ctx.route_name = route.value.name -- run global rule local global_rules = apisix_global_rules.global_rules() plugin.run_global_rules(api_ctx, global_rules, nil) if route.value.script then script.load(route, api_ctx) script.run("access", api_ctx) else local plugins = plugin.filter(api_ctx, route) api_ctx.plugins = plugins plugin.run_plugin("rewrite", plugins, api_ctx) if api_ctx.consumer then local changed local group_conf if api_ctx.consumer.group_id then group_conf = consumer_group.get(api_ctx.consumer.group_id) if not group_conf then core.log.error("failed to fetch consumer group config by ", "id: ", api_ctx.consumer.group_id) return core.response.exit(503) end end route, changed = plugin.merge_consumer_route( route, api_ctx.consumer, group_conf, api_ctx ) core.log.info("find consumer ", api_ctx.consumer.username, ", config changed: ", changed) if changed then api_ctx.matched_route = route core.table.clear(api_ctx.plugins) local phase = "rewrite_in_consumer" api_ctx.plugins = plugin.filter(api_ctx, route, api_ctx.plugins, nil, phase) -- rerun rewrite phase for newly added plugins in consumer plugin.run_plugin(phase, api_ctx.plugins, api_ctx) end end plugin.run_plugin("access", plugins, api_ctx) end _M.handle_upstream(api_ctx, route, enable_websocket) end function _M.dubbo_access_phase() ngx.ctx = fetch_ctx() end function _M.grpc_access_phase() ngx.ctx = fetch_ctx() local api_ctx = ngx.ctx.api_ctx if not api_ctx then return end local code, err = apisix_upstream.set_grpcs_upstream_param(api_ctx) if code then core.log.error("failed to set grpcs upstream param: ", err) core.response.exit(code) end if api_ctx.enable_mirror == true and has_mod then apisix_ngx_client.enable_mirror() end end local function set_resp_upstream_status(up_status) local_conf = core.config.local_conf() if local_conf.apisix and local_conf.apisix.show_upstream_status_in_response_header then core.response.set_header("X-APISIX-Upstream-Status", up_status) elseif #up_status == 3 then if tonumber(up_status) >= 500 and tonumber(up_status) <= 599 then core.response.set_header("X-APISIX-Upstream-Status", up_status) end elseif #up_status > 3 then -- the up_status can be "502, 502" or "502, 502 : " local last_status if str_byte(up_status, -1) == str_byte(" ") then last_status = str_sub(up_status, -6, -3) else last_status = str_sub(up_status, -3) end if tonumber(last_status) >= 500 and tonumber(last_status) <= 599 then core.response.set_header("X-APISIX-Upstream-Status", up_status) end end end function _M.http_header_filter_phase() core.response.set_header("Server", ver_header) local up_status = get_var("upstream_status") if up_status then set_resp_upstream_status(up_status) end common_phase("header_filter") local api_ctx = ngx.ctx.api_ctx if not api_ctx then return end local debug_headers = api_ctx.debug_headers if debug_headers then local deduplicate = core.table.new(core.table.nkeys(debug_headers), 0) for k, v in pairs(debug_headers) do core.table.insert(deduplicate, k) end core.response.set_header("Apisix-Plugins", core.table.concat(deduplicate, ", ")) end end function _M.http_body_filter_phase() common_phase("body_filter") common_phase("delayed_body_filter") end local function healthcheck_passive(api_ctx) local checker = api_ctx.up_checker if not checker then return end local up_conf = api_ctx.upstream_conf local passive = up_conf.checks.passive if not passive then return end core.log.info("enabled healthcheck passive") local host = up_conf.checks and up_conf.checks.active and up_conf.checks.active.host local port = up_conf.checks and up_conf.checks.active and up_conf.checks.active.port or api_ctx.balancer_port local resp_status = ngx.status if not is_http then -- 200 is the only success status code for TCP if resp_status ~= 200 then checker:report_tcp_failure(api_ctx.balancer_ip, port, host, nil, "passive") end return end local http_statuses = passive and passive.healthy and passive.healthy.http_statuses core.log.info("passive.healthy.http_statuses: ", core.json.delay_encode(http_statuses)) if http_statuses then for i, status in ipairs(http_statuses) do if resp_status == status then checker:report_http_status(api_ctx.balancer_ip, port, host, resp_status) end end end http_statuses = passive and passive.unhealthy and passive.unhealthy.http_statuses core.log.info("passive.unhealthy.http_statuses: ", core.json.delay_encode(http_statuses)) if not http_statuses then return end for i, status in ipairs(http_statuses) do if resp_status == status then checker:report_http_status(api_ctx.balancer_ip, port, host, resp_status) end end end function _M.status() core.response.exit(200, core.json.encode({ status = "ok" })) end function _M.status_ready() local local_conf = core.config.local_conf() local role = core.table.try_read_attr(local_conf, "deployment", "role") local provider = core.table.try_read_attr(local_conf, "deployment", "role_" .. role, "config_provider") if provider == "yaml" or provider == "etcd" then local status_shdict = ngx.shared["status-report"] local ids = status_shdict:get_keys() local error local worker_count = ngx.worker.count() if #ids ~= worker_count then core.log.warn("worker count: ", worker_count, " but status report count: ", #ids) error = "worker count: " .. ngx.worker.count() .. " but status report count: " .. #ids end if error then core.response.exit(503, core.json.encode({ status = "error", error = error })) return end for _, id in ipairs(ids) do local ready = status_shdict:get(id) if not ready then core.log.warn("worker id: ", id, " has not received configuration") error = "worker id: " .. id .. " has not received configuration" break end end if error then core.response.exit(503, core.json.encode({ status = "error", error = error })) return end core.response.exit(200, core.json.encode({ status = "ok" })) return end core.response.exit(503, core.json.encode({ status = "error", message = "unknown config provider: " .. tostring(provider) }), { ["Content-Type"] = "application/json" }) end function _M.http_log_phase() local api_ctx = common_phase("log") if not api_ctx then return end healthcheck_passive(api_ctx) if api_ctx.server_picker and api_ctx.server_picker.after_balance then api_ctx.server_picker.after_balance(api_ctx, false) end core.ctx.release_vars(api_ctx) if api_ctx.plugins then core.tablepool.release("plugins", api_ctx.plugins) end if api_ctx.curr_req_matched then core.tablepool.release("matched_route_record", api_ctx.curr_req_matched) end core.tablepool.release("api_ctx", api_ctx) end function _M.http_balancer_phase() local api_ctx = ngx.ctx.api_ctx if not api_ctx then core.log.error("invalid api_ctx") return core.response.exit(500) end load_balancer.run(api_ctx.matched_route, api_ctx, common_phase) end local function cors_admin() local_conf = core.config.local_conf() if not core.table.try_read_attr(local_conf, "deployment", "admin", "enable_admin_cors") then return end local method = get_method() if method == "OPTIONS" then core.response.set_header("Access-Control-Allow-Origin", "*", "Access-Control-Allow-Methods", "POST, GET, PUT, OPTIONS, DELETE, PATCH", "Access-Control-Max-Age", "3600", "Access-Control-Allow-Headers", "*", "Access-Control-Allow-Credentials", "true", "Content-Length", "0", "Content-Type", "text/plain") ngx_exit(200) end core.response.set_header("Access-Control-Allow-Origin", "*", "Access-Control-Allow-Credentials", "true", "Access-Control-Expose-Headers", "*", "Access-Control-Max-Age", "3600") end local function add_content_type() core.response.set_header("Content-Type", "application/json") end do local router function _M.http_admin() if not router then router = admin_init.get() end core.response.set_header("Server", ver_header) -- add cors rsp header cors_admin() -- add content type to rsp header add_content_type() -- core.log.info("uri: ", get_var("uri"), " method: ", get_method()) local ok = router:dispatch(get_var("uri"), {method = get_method()}) if not ok then ngx_exit(404) end end end -- do function _M.http_control() local ok = control_api_router.match(get_var("uri")) if not ok then ngx_exit(404) end end function _M.stream_init(args) core.log.info("enter stream_init") core.resolver.init_resolver(args) if core.config.init then local ok, err = core.config.init() if not ok then core.log.error("failed to load the configuration: ", err) end end xrpc.init() end function _M.stream_init_worker() core.log.info("enter stream_init_worker") local seed, err = core.utils.get_seed_from_urandom() if not seed then core.log.warn('failed to get seed from urandom: ', err) seed = ngx_now() * 1000 + ngx.worker.pid() end math.randomseed(seed) -- for testing only core.log.info("random stream test in [1, 10000]: ", math.random(1, 10000)) if core.config.init_worker then local ok, err = core.config.init_worker() if not ok then core.log.error("failed to init worker process of ", core.config.type, " config center, err: ", err) end end plugin.init_worker() xrpc.init_worker() router.stream_init_worker() require("apisix.http.service").init_worker() apisix_upstream.init_worker() require("apisix.events").init_worker() local discovery = require("apisix.discovery.init").discovery if discovery and discovery.init_worker then discovery.init_worker() end load_balancer = require("apisix.balancer") local_conf = core.config.local_conf() end function _M.stream_preread_phase() local ngx_ctx = ngx.ctx local api_ctx = core.tablepool.fetch("api_ctx", 0, 32) ngx_ctx.api_ctx = api_ctx if not verify_tls_client(api_ctx) then return ngx_exit(1) end core.ctx.set_vars_meta(api_ctx) local ok, err = router.router_stream.match(api_ctx) if not ok then core.log.error(err) return ngx_exit(1) end core.log.info("matched route: ", core.json.delay_encode(api_ctx.matched_route, true)) local matched_route = api_ctx.matched_route if not matched_route then return ngx_exit(1) end local up_id = matched_route.value.upstream_id if up_id then local upstream = apisix_upstream.get_by_id(up_id) if not upstream then if is_http then return core.response.exit(502) end return ngx_exit(1) end api_ctx.matched_upstream = upstream elseif matched_route.value.service_id then local service = service_fetch(matched_route.value.service_id) if not service then core.log.error("failed to fetch service configuration by ", "id: ", matched_route.value.service_id) return core.response.exit(404) end matched_route = plugin.merge_service_stream_route(service, matched_route) api_ctx.matched_route = matched_route api_ctx.conf_type = "stream_route&service" api_ctx.conf_version = matched_route.modifiedIndex .. "&" .. service.modifiedIndex api_ctx.conf_id = matched_route.value.id .. "&" .. service.value.id api_ctx.service_id = service.value.id api_ctx.service_name = service.value.name api_ctx.matched_upstream = matched_route.value.upstream if matched_route.value.upstream_id and not matched_route.value.upstream then local upstream = apisix_upstream.get_by_id(matched_route.value.upstream_id) if not upstream then if is_http then return core.response.exit(502) end return ngx_exit(1) end api_ctx.matched_upstream = upstream end else if matched_route.has_domain then local err matched_route, err = parse_domain_in_route(matched_route) if err then core.log.error("failed to get resolved route: ", err) return ngx_exit(1) end api_ctx.matched_route = matched_route end local route_val = matched_route.value api_ctx.matched_upstream = (matched_route.dns_value and matched_route.dns_value.upstream) or route_val.upstream end local plugins = core.tablepool.fetch("plugins", 32, 0) api_ctx.plugins = plugin.stream_filter(matched_route, plugins) -- core.log.info("valid plugins: ", core.json.delay_encode(plugins, true)) api_ctx.conf_type = "stream/route" api_ctx.conf_version = matched_route.modifiedIndex api_ctx.conf_id = matched_route.value.id plugin.run_plugin("preread", plugins, api_ctx) if matched_route.value.protocol then xrpc.run_protocol(matched_route.value.protocol, api_ctx) return end local code, err = set_upstream(matched_route, api_ctx) if code then core.log.error("failed to set upstream: ", err) return ngx_exit(1) end local server, err = load_balancer.pick_server(matched_route, api_ctx) if not server then core.log.error("failed to pick server: ", err) return ngx_exit(1) end api_ctx.picked_server = server -- run the before_proxy method in preread phase first to avoid always reinit request common_phase("before_proxy") end function _M.stream_balancer_phase() core.log.info("enter stream_balancer_phase") local api_ctx = ngx.ctx.api_ctx if not api_ctx then core.log.error("invalid api_ctx") return ngx_exit(1) end load_balancer.run(api_ctx.matched_route, api_ctx, common_phase) end function _M.stream_log_phase() core.log.info("enter stream_log_phase") local api_ctx = plugin.run_plugin("log") if not api_ctx then return end healthcheck_passive(api_ctx) core.ctx.release_vars(api_ctx) if api_ctx.plugins then core.tablepool.release("plugins", api_ctx.plugins) end core.tablepool.release("api_ctx", api_ctx) end return _M