-- -- Licensed to the Apache Software Foundation (ASF) under one or more -- contributor license agreements. See the NOTICE file distributed with -- this work for additional information regarding copyright ownership. -- The ASF licenses this file to You under the Apache License, Version 2.0 -- (the "License"); you may not use this file except in compliance with -- the License. You may obtain a copy of the License at -- -- http://www.apache.org/licenses/LICENSE-2.0 -- -- Unless required by applicable law or agreed to in writing, software -- distributed under the License is distributed on an "AS IS" BASIS, -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- See the License for the specific language governing permissions and -- limitations under the License. -- local core = require("apisix.core") local new_tracer = require("opentracing.tracer").new local zipkin_codec = require("apisix.plugins.zipkin.codec") local new_random_sampler = require("apisix.plugins.zipkin.random_sampler").new local new_reporter = require("apisix.plugins.zipkin.reporter").new local ngx = ngx local ngx_var = ngx.var local ngx_re = require("ngx.re") local pairs = pairs local tonumber = tonumber local to_hex = require "resty.string".to_hex local plugin_name = "zipkin" local ZIPKIN_SPAN_VER_1 = 1 local ZIPKIN_SPAN_VER_2 = 2 local plugin = require("apisix.plugin") local string_format = string.format local lrucache = core.lrucache.new({ type = "plugin", }) local schema = { type = "object", properties = { endpoint = {type = "string"}, sample_ratio = {type = "number", minimum = 0.00001, maximum = 1}, service_name = { type = "string", description = "service name for zipkin reporter", default = "APISIX", }, server_addr = { type = "string", description = "default is $server_addr, you can specify your external ip address", pattern = "^[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}$" }, span_version = { enum = {ZIPKIN_SPAN_VER_1, ZIPKIN_SPAN_VER_2}, default = ZIPKIN_SPAN_VER_2, }, }, required = {"endpoint", "sample_ratio"} } local _M = { version = 0.1, priority = 12011, name = plugin_name, schema = schema, } function _M.check_schema(conf) local check = {"endpoint"} core.utils.check_https(check, conf, plugin_name) return core.schema.check(schema, conf) end local plugin_info = plugin.plugin_attr(plugin_name) or {} local function create_tracer(conf,ctx) conf.route_id = ctx.route_id local reporter = new_reporter(conf) reporter:init_processor() local tracer = new_tracer(reporter, new_random_sampler(conf)) tracer:register_injector("http_headers", zipkin_codec.new_injector()) tracer:register_extractor("http_headers", zipkin_codec.new_extractor()) return tracer end local function parse_b3(b3) -- See https://github.com/openzipkin/b3-propagation#single-header if b3 == "0" then return nil, nil, nil, "0", nil end local pieces, err = ngx_re.split(b3, "-", nil, nil, 4) if not pieces then return err end if not pieces[1] then return "missing trace_id" end if not pieces[2] then return "missing span_id" end return nil, pieces[1], pieces[2], pieces[3], pieces[4] end local function inject_header(ctx) local opentracing = ctx.opentracing local tracer = opentracing.tracer local outgoing_headers = {} local span = opentracing.request_span if ctx.opentracing_sample then span = opentracing.proxy_span end tracer:inject(span, "http_headers", outgoing_headers) for k, v in pairs(outgoing_headers) do core.request.set_header(ctx, k, v) end end function _M.rewrite(plugin_conf, ctx) local conf = core.table.clone(plugin_conf) -- once the server started, server_addr and server_port won't change, so we can cache it. conf.server_port = tonumber(ctx.var['server_port']) if not conf.server_addr or conf.server_addr == '' then conf.server_addr = ctx.var["server_addr"] end local tracer = core.lrucache.plugin_ctx(lrucache, ctx, conf.server_addr .. conf.server_port, create_tracer, conf, ctx) local headers = core.request.headers(ctx) local per_req_sample_ratio -- X-B3-Flags: if it equals '1' then it overrides sampling policy -- We still want to warn on invalid sampled header, so do this after the above local debug = headers["x-b3-flags"] if debug == "1" then per_req_sample_ratio = 1 end local trace_id, request_span_id, sampled, parent_span_id local b3 = headers["b3"] if b3 then -- don't pass b3 header by default -- TODO: add an option like 'single_b3_header' so we can adapt to the upstream -- which doesn't support b3 header without always breaking down the header core.request.set_header(ctx, "b3", nil) local err err, trace_id, request_span_id, sampled, parent_span_id = parse_b3(b3) if err then core.log.error("invalid b3 header: ", b3, ", ignored: ", err) return 400 end if sampled == "d" then core.request.set_header(ctx, "x-b3-flags", "1") sampled = "1" end else -- X-B3-Sampled: if the client decided to sample this request, we do too. sampled = headers["x-b3-sampled"] trace_id = headers["x-b3-traceid"] parent_span_id = headers["x-b3-parentspanid"] request_span_id = headers["x-b3-spanid"] end local zipkin_ctx = core.tablepool.fetch("zipkin_ctx", 0, 3) zipkin_ctx.trace_id = trace_id zipkin_ctx.parent_span_id = parent_span_id zipkin_ctx.request_span_id = request_span_id ctx.zipkin = zipkin_ctx local wire_context = tracer:extract("http_headers", ctx) local start_timestamp = ngx.req.start_time() local request_span = tracer:start_span("apisix.request", { child_of = wire_context, start_timestamp = start_timestamp, tags = { component = "apisix", ["span.kind"] = "server", ["http.method"] = ctx.var.request_method, ["http.url"] = ctx.var.request_uri, -- TODO: support ipv6 ["peer.ipv4"] = core.request.get_remote_client_ip(ctx), ["peer.port"] = core.request.get_remote_client_port(ctx), } }) ctx.opentracing = { tracer = tracer, wire_context = wire_context, request_span = request_span, } -- Process sampled if sampled == "1" or sampled == "true" then per_req_sample_ratio = 1 elseif sampled == "0" or sampled == "false" then per_req_sample_ratio = 0 end ctx.opentracing_sample = tracer.sampler:sample(per_req_sample_ratio or conf.sample_ratio) if not ctx.opentracing_sample then request_span:set_baggage_item("x-b3-sampled","0") else request_span:set_baggage_item("x-b3-sampled","1") end if plugin_info.set_ngx_var then local span_context = request_span:context() ngx_var.zipkin_context_traceparent = string_format("00-%s-%s-%02x", to_hex(span_context.trace_id), to_hex(span_context.span_id), span_context:get_baggage_item("x-b3-sampled")) ngx_var.zipkin_trace_id = span_context.trace_id ngx_var.zipkin_span_id = span_context.span_id end if not ctx.opentracing_sample then return end local request_span = ctx.opentracing.request_span if conf.span_version == ZIPKIN_SPAN_VER_1 then ctx.opentracing.rewrite_span = request_span:start_child_span("apisix.rewrite", start_timestamp) ctx.REWRITE_END_TIME = tracer:time() ctx.opentracing.rewrite_span:finish(ctx.REWRITE_END_TIME) else ctx.opentracing.proxy_span = request_span:start_child_span("apisix.proxy", start_timestamp) end end function _M.access(conf, ctx) local opentracing = ctx.opentracing local tracer = opentracing.tracer if conf.span_version == ZIPKIN_SPAN_VER_1 then opentracing.access_span = opentracing.request_span:start_child_span( "apisix.access", ctx.REWRITE_END_TIME) ctx.ACCESS_END_TIME = tracer:time() opentracing.access_span:finish(ctx.ACCESS_END_TIME) opentracing.proxy_span = opentracing.request_span:start_child_span( "apisix.proxy", ctx.ACCESS_END_TIME) end -- send headers to upstream inject_header(ctx) end function _M.header_filter(conf, ctx) if not ctx.opentracing_sample then return end local opentracing = ctx.opentracing local end_time = opentracing.tracer:time() if conf.span_version == ZIPKIN_SPAN_VER_1 then if opentracing.proxy_span then opentracing.body_filter_span = opentracing.proxy_span:start_child_span( "apisix.body_filter", end_time) end else opentracing.proxy_span:finish(end_time) opentracing.response_span = opentracing.request_span:start_child_span( "apisix.response_span", end_time) end end function _M.log(conf, ctx) if ctx.zipkin then core.tablepool.release("zipkin_ctx", ctx.zipkin) ctx.zipkin = nil end if not ctx.opentracing_sample then return end local opentracing = ctx.opentracing local log_end_time = opentracing.tracer:time() if conf.span_version == ZIPKIN_SPAN_VER_1 then if opentracing.body_filter_span then opentracing.body_filter_span:finish(log_end_time) end if opentracing.proxy_span then opentracing.proxy_span:finish(log_end_time) end elseif opentracing.response_span then opentracing.response_span:finish(log_end_time) end local upstream_status = core.response.get_upstream_status(ctx) opentracing.request_span:set_tag("http.status_code", upstream_status) opentracing.request_span:finish(log_end_time) end return _M