-- -- Licensed to the Apache Software Foundation (ASF) under one or more -- contributor license agreements. See the NOTICE file distributed with -- this work for additional information regarding copyright ownership. -- The ASF licenses this file to You under the Apache License, Version 2.0 -- (the "License"); you may not use this file except in compliance with -- the License. You may obtain a copy of the License at -- -- http://www.apache.org/licenses/LICENSE-2.0 -- -- Unless required by applicable law or agreed to in writing, software -- distributed under the License is distributed on an "AS IS" BASIS, -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- See the License for the specific language governing permissions and -- limitations under the License. -- local require = require local local_conf = require("apisix.core.config_local").local_conf() local core = require("apisix.core") local core_sleep = require("apisix.core.utils").sleep local resty_consul = require('resty.consul') local http = require('resty.http') local util = require("apisix.cli.util") local ipairs = ipairs local error = error local ngx = ngx local unpack = unpack local tonumber = tonumber local pairs = pairs local ngx_timer_at = ngx.timer.at local ngx_timer_every = ngx.timer.every local log = core.log local json_delay_encode = core.json.delay_encode local ngx_worker_id = ngx.worker.id local exiting = ngx.worker.exiting local thread_spawn = ngx.thread.spawn local thread_wait = ngx.thread.wait local thread_kill = ngx.thread.kill local math_random = math.random local pcall = pcall local null = ngx.null local type = type local next = next local all_services = core.table.new(0, 5) local default_service local default_weight local sort_type local skip_service_map = core.table.new(0, 1) local dump_params local events local events_list local consul_services local default_skip_services = {"consul"} local default_random_range = 5 local default_catalog_error_index = -1 local default_health_error_index = -2 local watch_type_catalog = 1 local watch_type_health = 2 local max_retry_time = 256 local _M = { version = 0.3, } local function discovery_consul_callback(data, event, source, pid) all_services = data log.notice("update local variable all_services, event is: ", event, "source: ", source, "server pid:", pid, ", all services: ", json_delay_encode(all_services, true)) end function _M.all_nodes() return all_services end function _M.nodes(service_name) if not all_services then log.error("all_services is nil, failed to fetch nodes for : ", service_name) return end local resp_list = all_services[service_name] if not resp_list then log.error("fetch nodes failed by ", service_name, ", return default service") return default_service and {default_service} end log.info("process id: ", ngx_worker_id(), ", all_services[", service_name, "] = ", json_delay_encode(resp_list, true)) return resp_list end local function update_all_services(consul_server_url, up_services) -- clean old unused data local old_services = consul_services[consul_server_url] or {} for k, _ in pairs(old_services) do all_services[k] = nil end core.table.clear(old_services) for k, v in pairs(up_services) do all_services[k] = v end consul_services[consul_server_url] = up_services log.info("update all services: ", json_delay_encode(all_services, true)) end local function read_dump_services() local data, err = util.read_file(dump_params.path) if not data then log.error("read dump file get error: ", err) return end log.info("read dump file: ", data) data = util.trim(data) if #data == 0 then log.error("dump file is empty") return end local entity, err = core.json.decode(data) if not entity then log.error("decoded dump data got error: ", err, ", file content: ", data) return end if not entity.services or not entity.last_update then log.warn("decoded dump data miss fields, file content: ", data) return end local now_time = ngx.time() log.info("dump file last_update: ", entity.last_update, ", dump_params.expire: ", dump_params.expire, ", now_time: ", now_time) if dump_params.expire ~= 0 and (entity.last_update + dump_params.expire) < now_time then log.warn("dump file: ", dump_params.path, " had expired, ignored it") return end all_services = entity.services log.info("load dump file into memory success") end local function write_dump_services() local entity = { services = all_services, last_update = ngx.time(), expire = dump_params.expire, -- later need handle it } local data = core.json.encode(entity) local succ, err = util.write_file(dump_params.path, data) if not succ then log.error("write dump into file got error: ", err) end end local function show_dump_file() if not dump_params then return 503, "dump params is nil" end local data, err = util.read_file(dump_params.path) if not data then return 503, err end return 200, data end local function get_retry_delay(retry_delay) if not retry_delay or retry_delay >= max_retry_time then retry_delay = 1 else retry_delay = retry_delay * 4 end return retry_delay end local function get_opts(consul_server, is_catalog) local opts = { host = consul_server.host, port = consul_server.port, connect_timeout = consul_server.connect_timeout, read_timeout = consul_server.read_timeout, default_args = { token = consul_server.token, } } if not consul_server.keepalive then return opts end opts.default_args.wait = consul_server.wait_timeout --blocked wait!=0; unblocked by wait=0 if is_catalog then opts.default_args.index = consul_server.catalog_index else opts.default_args.index = consul_server.health_index end return opts end local function watch_catalog(consul_server) local client = resty_consul:new(get_opts(consul_server, true)) ::RETRY:: local watch_result, watch_err = client:get(consul_server.consul_watch_catalog_url) local watch_error_info = (watch_err ~= nil and watch_err) or ((watch_result ~= nil and watch_result.status ~= 200) and watch_result.status) if watch_error_info then log.error("connect consul: ", consul_server.consul_server_url, " by sub url: ", consul_server.consul_watch_catalog_url, ", got watch result: ", json_delay_encode(watch_result), ", with error: ", watch_error_info) return watch_type_catalog, default_catalog_error_index end if consul_server.catalog_index > 0 and consul_server.catalog_index == tonumber(watch_result.headers['X-Consul-Index']) then local random_delay = math_random(default_random_range) log.info("watch catalog has no change, re-watch consul after ", random_delay, " seconds") core_sleep(random_delay) goto RETRY end return watch_type_catalog, watch_result.headers['X-Consul-Index'] end local function watch_health(consul_server) local client = resty_consul:new(get_opts(consul_server, false)) ::RETRY:: local watch_result, watch_err = client:get(consul_server.consul_watch_health_url) local watch_error_info = (watch_err ~= nil and watch_err) or ((watch_result ~= nil and watch_result.status ~= 200) and watch_result.status) if watch_error_info then log.error("connect consul: ", consul_server.consul_server_url, " by sub url: ", consul_server.consul_watch_health_url, ", got watch result: ", json_delay_encode(watch_result), ", with error: ", watch_error_info) return watch_type_health, default_health_error_index end if consul_server.health_index > 0 and consul_server.health_index == tonumber(watch_result.headers['X-Consul-Index']) then local random_delay = math_random(default_random_range) log.info("watch health has no change, re-watch consul after ", random_delay, " seconds") core_sleep(random_delay) goto RETRY end return watch_type_health, watch_result.headers['X-Consul-Index'] end local function check_keepalive(consul_server, retry_delay) if consul_server.keepalive and not exiting() then local ok, err = ngx_timer_at(0, _M.connect, consul_server, retry_delay) if not ok then log.error("create ngx_timer_at got error: ", err) return end end end local function update_index(consul_server, catalog_index, health_index) local c_index = 0 local h_index = 0 if catalog_index ~= nil then c_index = tonumber(catalog_index) end if health_index ~= nil then h_index = tonumber(health_index) end if c_index > 0 then consul_server.catalog_index = c_index end if h_index > 0 then consul_server.health_index = h_index end end local function is_not_empty(value) if value == nil or value == null or (type(value) == "table" and not next(value)) or (type(value) == "string" and value == "") then return false end return true end local function watch_result_is_valid(watch_type, index, catalog_index, health_index) if index <= 0 then return false end if watch_type == watch_type_catalog then if index == catalog_index then return false end else if index == health_index then return false end end return true end local function combine_sort_nodes_cmp(left, right) if left.host ~= right.host then return left.host < right.host end return left.port < right.port end local function port_sort_nodes_cmp(left, right) return left.port < right.port end local function host_sort_nodes_cmp(left, right) return left.host < right.host end function _M.connect(premature, consul_server, retry_delay) if premature then return end local catalog_thread, spawn_catalog_err = thread_spawn(watch_catalog, consul_server) if not catalog_thread then local random_delay = math_random(default_random_range) log.error("failed to spawn thread watch catalog: ", spawn_catalog_err, ", retry connecting consul after ", random_delay, " seconds") core_sleep(random_delay) check_keepalive(consul_server, retry_delay) return end local health_thread, err = thread_spawn(watch_health, consul_server) if not health_thread then thread_kill(catalog_thread) local random_delay = math_random(default_random_range) log.error("failed to spawn thread watch health: ", err, ", retry connecting consul after ", random_delay, " seconds") core_sleep(random_delay) check_keepalive(consul_server, retry_delay) return end local thread_wait_ok, watch_type, index = thread_wait(catalog_thread, health_thread) thread_kill(catalog_thread) thread_kill(health_thread) if not thread_wait_ok then local random_delay = math_random(default_random_range) log.error("failed to wait thread: ", watch_type, ", retry connecting consul after ", random_delay, " seconds") core_sleep(random_delay) check_keepalive(consul_server, retry_delay) return end -- double check index has changed if not watch_result_is_valid(tonumber(watch_type), tonumber(index), consul_server.catalog_index, consul_server.health_index) then retry_delay = get_retry_delay(retry_delay) log.warn("get all svcs got err, retry connecting consul after ", retry_delay, " seconds") core_sleep(retry_delay) check_keepalive(consul_server, retry_delay) return end local consul_client = resty_consul:new({ host = consul_server.host, port = consul_server.port, connect_timeout = consul_server.connect_timeout, read_timeout = consul_server.read_timeout, default_args = { token = consul_server.token } }) local catalog_success, catalog_res, catalog_err = pcall(function() return consul_client:get(consul_server.consul_watch_catalog_url) end) if not catalog_success then log.error("connect consul: ", consul_server.consul_server_url, " by sub url: ", consul_server.consul_watch_catalog_url, ", got catalog result: ", json_delay_encode(catalog_res)) check_keepalive(consul_server, retry_delay) return end local catalog_error_info = (catalog_err ~= nil and catalog_err) or ((catalog_res ~= nil and catalog_res.status ~= 200) and catalog_res.status) if catalog_error_info then log.error("connect consul: ", consul_server.consul_server_url, " by sub url: ", consul_server.consul_watch_catalog_url, ", got catalog result: ", json_delay_encode(catalog_res), ", with error: ", catalog_error_info) retry_delay = get_retry_delay(retry_delay) log.warn("get all svcs got err, retry connecting consul after ", retry_delay, " seconds") core_sleep(retry_delay) check_keepalive(consul_server, retry_delay) return end -- get health index local success, health_res, health_err = pcall(function() return consul_client:get(consul_server.consul_watch_health_url) end) if not success then log.error("connect consul: ", consul_server.consul_server_url, " by sub url: ", consul_server.consul_watch_health_url, ", got health result: ", json_delay_encode(health_res)) check_keepalive(consul_server, retry_delay) return end local health_error_info = (health_err ~= nil and health_err) or ((health_res ~= nil and health_res.status ~= 200) and health_res.status) if health_error_info then log.error("connect consul: ", consul_server.consul_server_url, " by sub url: ", consul_server.consul_watch_health_url, ", got health result: ", json_delay_encode(health_res), ", with error: ", health_error_info) retry_delay = get_retry_delay(retry_delay) log.warn("get all svcs got err, retry connecting consul after ", retry_delay, " seconds") core_sleep(retry_delay) check_keepalive(consul_server, retry_delay) return end log.info("connect consul: ", consul_server.consul_server_url, ", catalog_result status: ", catalog_res.status, ", catalog_result.headers.index: ", catalog_res.headers['X-Consul-Index'], ", consul_server.index: ", consul_server.index, ", consul_server: ", json_delay_encode(consul_server)) -- if the current index is different from the last index, then update the service if (consul_server.catalog_index ~= tonumber(catalog_res.headers['X-Consul-Index'])) or (consul_server.health_index ~= tonumber(health_res.headers['X-Consul-Index'])) then local up_services = core.table.new(0, #catalog_res.body) for service_name, _ in pairs(catalog_res.body) do -- check if the service_name is 'skip service' if skip_service_map[service_name] then goto CONTINUE end -- get node from service local svc_url = consul_server.consul_sub_url .. "/" .. service_name local svc_success, result, get_err = pcall(function() return consul_client:get(svc_url, {passing = true}) end) local error_info = (get_err ~= nil and get_err) or ((result ~= nil and result.status ~= 200) and result.status) if not svc_success or error_info then log.error("connect consul: ", consul_server.consul_server_url, ", by service url: ", svc_url, ", with error: ", error_info) goto CONTINUE end -- decode body, decode json, update service, error handling -- check result body is not nil and not empty if is_not_empty(result.body) then -- add services to table local nodes = up_services[service_name] local nodes_uniq = {} for _, node in ipairs(result.body) do if not node.Service then goto CONTINUE end local svc_address, svc_port = node.Service.Address, node.Service.Port -- Handle nil or 0 port case - default to 80 for HTTP services if not svc_port or svc_port == 0 then svc_port = 80 end -- if nodes is nil, new nodes table and set to up_services if not nodes then nodes = core.table.new(1, 0) up_services[service_name] = nodes end -- not store duplicate service IDs. local service_id = svc_address .. ":" .. svc_port if not nodes_uniq[service_id] then -- add node to nodes table core.table.insert(nodes, { host = svc_address, port = tonumber(svc_port), weight = default_weight, }) nodes_uniq[service_id] = true end end if nodes then if sort_type == "port_sort" then core.table.sort(nodes, port_sort_nodes_cmp) elseif sort_type == "host_sort" then core.table.sort(nodes, host_sort_nodes_cmp) elseif sort_type == "combine_sort" then core.table.sort(nodes, combine_sort_nodes_cmp) end end up_services[service_name] = nodes end :: CONTINUE :: end update_all_services(consul_server.consul_server_url, up_services) --update events local post_ok, post_err = events:post(events_list._source, events_list.updating, all_services) if not post_ok then log.error("post_event failure with ", events_list._source, ", update all services error: ", post_err) end if dump_params then ngx_timer_at(0, write_dump_services) end update_index(consul_server, catalog_res.headers['X-Consul-Index'], health_res.headers['X-Consul-Index']) end check_keepalive(consul_server, retry_delay) end local function format_consul_params(consul_conf) local consul_server_list = core.table.new(0, #consul_conf.servers) for _, v in pairs(consul_conf.servers) do local scheme, host, port, path = unpack(http.parse_uri(nil, v)) if scheme ~= "http" then return nil, "only support consul http schema address, eg: http://address:port" elseif path ~= "/" or core.string.has_suffix(v, '/') then return nil, "invalid consul server address, the valid format: http://address:port" end core.table.insert(consul_server_list, { host = host, port = port, token = consul_conf.token, connect_timeout = consul_conf.timeout.connect, read_timeout = consul_conf.timeout.read, wait_timeout = consul_conf.timeout.wait, consul_watch_catalog_url = "/catalog/services", consul_sub_url = "/health/service", consul_watch_health_url = "/health/state/any", consul_server_url = v .. "/v1", weight = consul_conf.weight, keepalive = consul_conf.keepalive, health_index = 0, catalog_index = 0, fetch_interval = consul_conf.fetch_interval -- fetch interval to next connect consul }) end return consul_server_list, nil end function _M.init_worker() local consul_conf = local_conf.discovery.consul if consul_conf.dump then local dump = consul_conf.dump dump_params = dump if dump.load_on_init then read_dump_services() end end events = require("apisix.events") events_list = events:event_list( "discovery_consul_update_all_services", "updating" ) if 0 ~= ngx_worker_id() then events:register(discovery_consul_callback, events_list._source, events_list.updating) return end log.notice("consul_conf: ", json_delay_encode(consul_conf, true)) default_weight = consul_conf.weight sort_type = consul_conf.sort_type -- set default service, used when the server node cannot be found if consul_conf.default_service then default_service = consul_conf.default_service default_service.weight = default_weight end if consul_conf.skip_services then skip_service_map = core.table.new(0, #consul_conf.skip_services) for _, v in ipairs(consul_conf.skip_services) do skip_service_map[v] = true end end -- set up default skip service for _, v in ipairs(default_skip_services) do skip_service_map[v] = true end local consul_servers_list, err = format_consul_params(consul_conf) if err then error("format consul config got error: " .. err) end log.info("consul_server_list: ", json_delay_encode(consul_servers_list, true)) consul_services = core.table.new(0, 1) -- success or failure for _, server in ipairs(consul_servers_list) do local ok, err = ngx_timer_at(0, _M.connect, server) if not ok then error("create consul got error: " .. err) end if server.keepalive == false then ngx_timer_every(server.fetch_interval, _M.connect, server) end end end function _M.dump_data() return {config = local_conf.discovery.consul, services = all_services } end function _M.control_api() return { { methods = {"GET"}, uris = {"/show_dump_file"}, handler = show_dump_file, } } end return _M