-- -- Licensed to the Apache Software Foundation (ASF) under one or more -- contributor license agreements. See the NOTICE file distributed with -- this work for additional information regarding copyright ownership. -- The ASF licenses this file to You under the Apache License, Version 2.0 -- (the "License"); you may not use this file except in compliance with -- the License. You may obtain a copy of the License at -- -- http://www.apache.org/licenses/LICENSE-2.0 -- -- Unless required by applicable law or agreed to in writing, software -- distributed under the License is distributed on an "AS IS" BASIS, -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- See the License for the specific language governing permissions and -- limitations under the License. -- local ngx = ngx local ipairs = ipairs local pairs = pairs local string = string local tonumber = tonumber local tostring = tostring local os = os local error = error local pcall = pcall local setmetatable = setmetatable local is_http = ngx.config.subsystem == "http" local process = require("ngx.process") local core = require("apisix.core") local util = require("apisix.cli.util") local local_conf = require("apisix.core.config_local").local_conf() local informer_factory = require("apisix.discovery.kubernetes.informer_factory") local ctx local endpoint_lrucache = core.lrucache.new({ ttl = 300, count = 1024 }) local endpoint_buffer = {} local function sort_nodes_cmp(left, right) if left.host ~= right.host then return left.host < right.host end return left.port < right.port end local function on_endpoint_slices_modified(handle, endpoint) if handle.namespace_selector and not handle:namespace_selector(endpoint.metadata.namespace) then return end core.log.debug(core.json.delay_encode(endpoint)) core.table.clear(endpoint_buffer) local endpointslices = endpoint.endpoints for _, endpointslice in ipairs(endpointslices or {}) do if endpointslice.addresses then local addresses = endpointslices.addresses for _, port in ipairs(endpoint.ports or {}) do local port_name if port.name then port_name = port.name elseif port.targetPort then port_name = tostring(port.targetPort) else port_name = tostring(port.port) end if endpointslice.conditions and endpointslice.condition.ready then local nodes = endpoint_buffer[port_name] if nodes == nil then nodes = core.table.new(0, #endpointslices * #addresses) endpoint_buffer[port_name] = nodes end for _, address in ipairs(endpointslices.addresses) do core.table.insert(nodes, { host = address.ip, port = port.port, weight = handle.default_weight }) end end end end end for _, ports in pairs(endpoint_buffer) do for _, nodes in pairs(ports) do core.table.sort(nodes, sort_nodes_cmp) end end local endpoint_key = endpoint.metadata.namespace .. "/" .. endpoint.metadata.name local endpoint_content = core.json.encode(endpoint_buffer, true) local endpoint_version = ngx.crc32_long(endpoint_content) local _, err _, err = handle.endpoint_dict:safe_set(endpoint_key .. "#version", endpoint_version) if err then core.log.error("set endpoint version into discovery DICT failed, ", err) return end _, err = handle.endpoint_dict:safe_set(endpoint_key, endpoint_content) if err then core.log.error("set endpoint into discovery DICT failed, ", err) handle.endpoint_dict:delete(endpoint_key .. "#version") end end local function on_endpoint_modified(handle, endpoint) if handle.namespace_selector and not handle:namespace_selector(endpoint.metadata.namespace) then return end core.log.debug(core.json.delay_encode(endpoint)) core.table.clear(endpoint_buffer) local subsets = endpoint.subsets for _, subset in ipairs(subsets or {}) do if subset.addresses then local addresses = subset.addresses for _, port in ipairs(subset.ports or {}) do local port_name if port.name then port_name = port.name elseif port.targetPort then port_name = tostring(port.targetPort) else port_name = tostring(port.port) end local nodes = endpoint_buffer[port_name] if nodes == nil then nodes = core.table.new(0, #subsets * #addresses) endpoint_buffer[port_name] = nodes end for _, address in ipairs(subset.addresses) do core.table.insert(nodes, { host = address.ip, port = port.port, weight = handle.default_weight }) end end end end for _, ports in pairs(endpoint_buffer) do for _, nodes in pairs(ports) do core.table.sort(nodes, sort_nodes_cmp) end end local endpoint_key = endpoint.metadata.namespace .. "/" .. endpoint.metadata.name local endpoint_content = core.json.encode(endpoint_buffer, true) local endpoint_version = ngx.crc32_long(endpoint_content) local _, err _, err = handle.endpoint_dict:safe_set(endpoint_key .. "#version", endpoint_version) if err then core.log.error("set endpoint version into discovery DICT failed, ", err) return end _, err = handle.endpoint_dict:safe_set(endpoint_key, endpoint_content) if err then core.log.error("set endpoint into discovery DICT failed, ", err) handle.endpoint_dict:delete(endpoint_key .. "#version") end end local function on_endpoint_deleted(handle, endpoint) if handle.namespace_selector and not handle:namespace_selector(endpoint.metadata.namespace) then return end core.log.debug(core.json.delay_encode(endpoint)) local endpoint_key = endpoint.metadata.namespace .. "/" .. endpoint.metadata.name handle.endpoint_dict:delete(endpoint_key .. "#version") handle.endpoint_dict:delete(endpoint_key) end local function pre_list(handle) handle.endpoint_dict:flush_all() end local function post_list(handle) handle.endpoint_dict:flush_expired() end local function setup_label_selector(conf, informer) informer.label_selector = conf.label_selector end local function setup_namespace_selector(conf, informer) local ns = conf.namespace_selector if ns == nil then informer.namespace_selector = nil return end if ns.equal then informer.field_selector = "metadata.namespace=" .. ns.equal informer.namespace_selector = nil return end if ns.not_equal then informer.field_selector = "metadata.namespace!=" .. ns.not_equal informer.namespace_selector = nil return end if ns.match then informer.namespace_selector = function(self, namespace) local match = conf.namespace_selector.match local m, err for _, v in ipairs(match) do m, err = ngx.re.match(namespace, v, "jo") if m and m[0] == namespace then return true end if err then core.log.error("ngx.re.match failed: ", err) end end return false end return end if ns.not_match then informer.namespace_selector = function(self, namespace) local not_match = conf.namespace_selector.not_match local m, err for _, v in ipairs(not_match) do m, err = ngx.re.match(namespace, v, "jo") if m and m[0] == namespace then return false end if err then return false end end return true end return end return end local function read_env(key) if #key > 3 then local first, second = string.byte(key, 1, 2) if first == string.byte('$') and second == string.byte('{') then local last = string.byte(key, #key) if last == string.byte('}') then local env = string.sub(key, 3, #key - 1) local value = os.getenv(env) if not value then return nil, "not found environment variable " .. env end return value end end end return key end local function read_token(token_file) local token, err = util.read_file(token_file) if err then return nil, err end -- remove possible extra whitespace return util.trim(token) end local function get_apiserver(conf) local apiserver = { schema = "", host = "", port = "", } apiserver.schema = conf.service.schema if apiserver.schema ~= "http" and apiserver.schema ~= "https" then return nil, "service.schema should set to one of [http,https] but " .. apiserver.schema end local err apiserver.host, err = read_env(conf.service.host) if err then return nil, err end if apiserver.host == "" then return nil, "service.host should set to non-empty string" end local port port, err = read_env(conf.service.port) if err then return nil, err end apiserver.port = tonumber(port) if not apiserver.port or apiserver.port <= 0 or apiserver.port > 65535 then return nil, "invalid port value: " .. apiserver.port end if conf.client.token then local token, err = read_env(conf.client.token) if err then return nil, err end apiserver.token = util.trim(token) elseif conf.client.token_file and conf.client.token_file ~= "" then setmetatable(apiserver, { __index = function(_, key) if key ~= "token" then return end local token_file, err = read_env(conf.client.token_file) if err then core.log.error("failed to read token file path: ", err) return end local token, err = read_token(token_file) if err then core.log.error("failed to read token from file: ", err) return end core.log.debug("re-read the token value") return token end }) else return nil, "one of [client.token,client.token_file] should be set but none" end if apiserver.schema == "https" and apiserver.token == "" then return nil, "apiserver.token should set to non-empty string when service.schema is https" end return apiserver end local function create_endpoint_lrucache(endpoint_dict, endpoint_key, endpoint_port) local endpoint_content = endpoint_dict:get_stale(endpoint_key) if not endpoint_content then core.log.error("get empty endpoint content from discovery DIC, this should not happen ", endpoint_key) return nil end local endpoint = core.json.decode(endpoint_content) if not endpoint then core.log.error("decode endpoint content failed, this should not happen, content: ", endpoint_content) return nil end return endpoint[endpoint_port] end local _M = { version = "0.0.1" } local function start_fetch(handle) local timer_runner timer_runner = function(premature) if premature then return end local ok, status = pcall(handle.list_watch, handle, handle.apiserver) local retry_interval = 0 if not ok then core.log.error("list_watch failed, kind: ", handle.kind, ", reason: ", "RuntimeException", ", message : ", status) retry_interval = 40 elseif not status then retry_interval = 40 end ngx.timer.at(retry_interval, timer_runner) end ngx.timer.at(0, timer_runner) end local function get_endpoint_dict(id) local shm = "kubernetes" if id and #id > 0 then shm = shm .. "-" .. id end if not is_http then shm = shm .. "-stream" end return ngx.shared[shm] end local function single_mode_init(conf) local endpoint_dict = get_endpoint_dict() if not endpoint_dict then error("failed to get lua_shared_dict: ngx.shared.kubernetes, " .. "please check your APISIX version") end if process.type() ~= "privileged agent" then ctx = endpoint_dict return end local apiserver, err = get_apiserver(conf) if err then error(err) return end local default_weight = conf.default_weight local endpoints_informer, err if conf.watch_endpoint_slices_schema then endpoints_informer, err = informer_factory.new("discovery.k8s.io", "v1", "EndpointSlice", "endpointslices", "") else endpoints_informer, err = informer_factory.new("", "v1", "Endpoints", "endpoints", "") end if err then error(err) return end setup_namespace_selector(conf, endpoints_informer) setup_label_selector(conf, endpoints_informer) if conf.watch_endpoint_slices_schema then endpoints_informer.on_added = on_endpoint_slices_modified endpoints_informer.on_modified = on_endpoint_slices_modified else endpoints_informer.on_added = on_endpoint_modified endpoints_informer.on_modified = on_endpoint_modified end endpoints_informer.on_deleted = on_endpoint_deleted endpoints_informer.pre_list = pre_list endpoints_informer.post_list = post_list ctx = setmetatable({ endpoint_dict = endpoint_dict, apiserver = apiserver, default_weight = default_weight }, { __index = endpoints_informer }) start_fetch(ctx) end local function single_mode_nodes(service_name) local pattern = "^(.*):(.*)$" -- namespace/name:port_name local match = ngx.re.match(service_name, pattern, "jo") if not match then core.log.error("get unexpected upstream service_name: ", service_name) return nil end local endpoint_dict = ctx local endpoint_key = match[1] local endpoint_port = match[2] local endpoint_version = endpoint_dict:get_stale(endpoint_key .. "#version") if not endpoint_version then core.log.info("get empty endpoint version from discovery DICT ", endpoint_key) return nil end return endpoint_lrucache(service_name, endpoint_version, create_endpoint_lrucache, endpoint_dict, endpoint_key, endpoint_port) end local function multiple_mode_worker_init(confs) for _, conf in ipairs(confs) do local id = conf.id if ctx[id] then error("duplicate id value") end local endpoint_dict = get_endpoint_dict(id) if not endpoint_dict then error(string.format("failed to get lua_shared_dict: ngx.shared.kubernetes-%s, ", id) .. "please check your APISIX version") end ctx[id] = endpoint_dict end end local function multiple_mode_init(confs) ctx = core.table.new(#confs, 0) if process.type() ~= "privileged agent" then multiple_mode_worker_init(confs) return end for _, conf in ipairs(confs) do local id = conf.id if ctx[id] then error("duplicate id value") end local endpoint_dict = get_endpoint_dict(id) if not endpoint_dict then error(string.format("failed to get lua_shared_dict: ngx.shared.kubernetes-%s, ", id) .. "please check your APISIX version") end local apiserver, err = get_apiserver(conf) if err then error(err) return end local default_weight = conf.default_weight local endpoints_informer, err if conf.watch_endpoint_slices_schema then endpoints_informer, err = informer_factory.new("discovery.k8s.io", "v1", "EndpointSlice", "endpointslices", "") else endpoints_informer, err = informer_factory.new("", "v1", "Endpoints", "endpoints", "") end if err then error(err) return end setup_namespace_selector(conf, endpoints_informer) setup_label_selector(conf, endpoints_informer) if conf.watch_endpoint_slices_schema then endpoints_informer.on_added = on_endpoint_slices_modified endpoints_informer.on_modified = on_endpoint_slices_modified else endpoints_informer.on_added = on_endpoint_modified endpoints_informer.on_modified = on_endpoint_modified end endpoints_informer.on_deleted = on_endpoint_deleted endpoints_informer.pre_list = pre_list endpoints_informer.post_list = post_list ctx[id] = setmetatable({ endpoint_dict = endpoint_dict, apiserver = apiserver, default_weight = default_weight }, { __index = endpoints_informer }) end for _, item in pairs(ctx) do start_fetch(item) end end local function multiple_mode_nodes(service_name) local pattern = "^(.*)/(.*/.*):(.*)$" -- id/namespace/name:port_name local match = ngx.re.match(service_name, pattern, "jo") if not match then core.log.error("get unexpected upstream service_name: ", service_name) return nil end local id = match[1] local endpoint_dict = ctx[id] if not endpoint_dict then core.log.error("id not exist") return nil end local endpoint_key = match[2] local endpoint_port = match[3] local endpoint_version = endpoint_dict:get_stale(endpoint_key .. "#version") if not endpoint_version then core.log.info("get empty endpoint version from discovery DICT ", endpoint_key) return nil end return endpoint_lrucache(service_name, endpoint_version, create_endpoint_lrucache, endpoint_dict, endpoint_key, endpoint_port) end function _M.init_worker() local discovery_conf = local_conf.discovery.kubernetes core.log.info("kubernetes discovery conf: ", core.json.delay_encode(discovery_conf)) if #discovery_conf == 0 then _M.nodes = single_mode_nodes single_mode_init(discovery_conf) else _M.nodes = multiple_mode_nodes multiple_mode_init(discovery_conf) end end local function dump_endpoints_from_dict(endpoint_dict) local keys, err = endpoint_dict:get_keys(0) if err then core.log.error("get keys from discovery dict failed: ", err) return end if not keys or #keys == 0 then return end local endpoints = {} for i = 1, #keys do local key = keys[i] -- skip key with suffix #version if key:sub(-#"#version") ~= "#version" then local value = endpoint_dict:get(key) core.table.insert(endpoints, { name = key, value = value }) end end return endpoints end function _M.dump_data() local discovery_conf = local_conf.discovery.kubernetes local eps = {} if #discovery_conf == 0 then -- Single mode: discovery_conf is a single configuration object local endpoint_dict = get_endpoint_dict() local endpoints = dump_endpoints_from_dict(endpoint_dict) if endpoints then core.table.insert(eps, { endpoints = endpoints }) end else -- Multiple mode: discovery_conf is an array of configuration objects for _, conf in ipairs(discovery_conf) do local endpoint_dict = get_endpoint_dict(conf.id) local endpoints = dump_endpoints_from_dict(endpoint_dict) if endpoints then core.table.insert(eps, { id = conf.id, endpoints = endpoints }) end end end return {config = discovery_conf, endpoints = eps} end return _M