- Implements Apache APISIX packaging for Cloudron platform. - Includes Dockerfile, CloudronManifest.json, and start.sh. - Configured to use Cloudron's etcd addon. 🤖 Generated with Gemini CLI Co-Authored-By: Gemini <noreply@google.com>
319 lines
10 KiB
Lua
319 lines
10 KiB
Lua
--
|
|
-- Licensed to the Apache Software Foundation (ASF) under one or more
|
|
-- contributor license agreements. See the NOTICE file distributed with
|
|
-- this work for additional information regarding copyright ownership.
|
|
-- The ASF licenses this file to You under the Apache License, Version 2.0
|
|
-- (the "License"); you may not use this file except in compliance with
|
|
-- the License. You may obtain a copy of the License at
|
|
--
|
|
-- http://www.apache.org/licenses/LICENSE-2.0
|
|
--
|
|
-- Unless required by applicable law or agreed to in writing, software
|
|
-- distributed under the License is distributed on an "AS IS" BASIS,
|
|
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
-- See the License for the specific language governing permissions and
|
|
-- limitations under the License.
|
|
--
|
|
local core = require("apisix.core")
|
|
local new_tracer = require("opentracing.tracer").new
|
|
local zipkin_codec = require("apisix.plugins.zipkin.codec")
|
|
local new_random_sampler = require("apisix.plugins.zipkin.random_sampler").new
|
|
local new_reporter = require("apisix.plugins.zipkin.reporter").new
|
|
local ngx = ngx
|
|
local ngx_var = ngx.var
|
|
local ngx_re = require("ngx.re")
|
|
local pairs = pairs
|
|
local tonumber = tonumber
|
|
local to_hex = require "resty.string".to_hex
|
|
|
|
local plugin_name = "zipkin"
|
|
local ZIPKIN_SPAN_VER_1 = 1
|
|
local ZIPKIN_SPAN_VER_2 = 2
|
|
local plugin = require("apisix.plugin")
|
|
local string_format = string.format
|
|
|
|
|
|
local lrucache = core.lrucache.new({
|
|
type = "plugin",
|
|
})
|
|
|
|
local schema = {
|
|
type = "object",
|
|
properties = {
|
|
endpoint = {type = "string"},
|
|
sample_ratio = {type = "number", minimum = 0.00001, maximum = 1},
|
|
service_name = {
|
|
type = "string",
|
|
description = "service name for zipkin reporter",
|
|
default = "APISIX",
|
|
},
|
|
server_addr = {
|
|
type = "string",
|
|
description = "default is $server_addr, you can specify your external ip address",
|
|
pattern = "^[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}$"
|
|
},
|
|
span_version = {
|
|
enum = {ZIPKIN_SPAN_VER_1, ZIPKIN_SPAN_VER_2},
|
|
default = ZIPKIN_SPAN_VER_2,
|
|
},
|
|
},
|
|
required = {"endpoint", "sample_ratio"}
|
|
}
|
|
|
|
|
|
local _M = {
|
|
version = 0.1,
|
|
priority = 12011,
|
|
name = plugin_name,
|
|
schema = schema,
|
|
}
|
|
|
|
|
|
function _M.check_schema(conf)
|
|
local check = {"endpoint"}
|
|
core.utils.check_https(check, conf, plugin_name)
|
|
return core.schema.check(schema, conf)
|
|
end
|
|
|
|
local plugin_info = plugin.plugin_attr(plugin_name) or {}
|
|
|
|
|
|
local function create_tracer(conf,ctx)
|
|
conf.route_id = ctx.route_id
|
|
local reporter = new_reporter(conf)
|
|
reporter:init_processor()
|
|
local tracer = new_tracer(reporter, new_random_sampler(conf))
|
|
tracer:register_injector("http_headers", zipkin_codec.new_injector())
|
|
tracer:register_extractor("http_headers", zipkin_codec.new_extractor())
|
|
return tracer
|
|
end
|
|
|
|
|
|
local function parse_b3(b3)
|
|
-- See https://github.com/openzipkin/b3-propagation#single-header
|
|
if b3 == "0" then
|
|
return nil, nil, nil, "0", nil
|
|
end
|
|
|
|
local pieces, err = ngx_re.split(b3, "-", nil, nil, 4)
|
|
if not pieces then
|
|
return err
|
|
end
|
|
if not pieces[1] then
|
|
return "missing trace_id"
|
|
end
|
|
if not pieces[2] then
|
|
return "missing span_id"
|
|
end
|
|
return nil, pieces[1], pieces[2], pieces[3], pieces[4]
|
|
end
|
|
|
|
local function inject_header(ctx)
|
|
local opentracing = ctx.opentracing
|
|
local tracer = opentracing.tracer
|
|
local outgoing_headers = {}
|
|
|
|
local span = opentracing.request_span
|
|
if ctx.opentracing_sample then
|
|
span = opentracing.proxy_span
|
|
end
|
|
tracer:inject(span, "http_headers", outgoing_headers)
|
|
|
|
for k, v in pairs(outgoing_headers) do
|
|
core.request.set_header(ctx, k, v)
|
|
end
|
|
end
|
|
|
|
function _M.rewrite(plugin_conf, ctx)
|
|
local conf = core.table.clone(plugin_conf)
|
|
-- once the server started, server_addr and server_port won't change, so we can cache it.
|
|
conf.server_port = tonumber(ctx.var['server_port'])
|
|
|
|
if not conf.server_addr or conf.server_addr == '' then
|
|
conf.server_addr = ctx.var["server_addr"]
|
|
end
|
|
|
|
local tracer = core.lrucache.plugin_ctx(lrucache, ctx, conf.server_addr .. conf.server_port,
|
|
create_tracer, conf, ctx)
|
|
|
|
local headers = core.request.headers(ctx)
|
|
local per_req_sample_ratio
|
|
|
|
-- X-B3-Flags: if it equals '1' then it overrides sampling policy
|
|
-- We still want to warn on invalid sampled header, so do this after the above
|
|
local debug = headers["x-b3-flags"]
|
|
if debug == "1" then
|
|
per_req_sample_ratio = 1
|
|
end
|
|
|
|
local trace_id, request_span_id, sampled, parent_span_id
|
|
local b3 = headers["b3"]
|
|
if b3 then
|
|
-- don't pass b3 header by default
|
|
-- TODO: add an option like 'single_b3_header' so we can adapt to the upstream
|
|
-- which doesn't support b3 header without always breaking down the header
|
|
core.request.set_header(ctx, "b3", nil)
|
|
|
|
local err
|
|
err, trace_id, request_span_id, sampled, parent_span_id = parse_b3(b3)
|
|
|
|
if err then
|
|
core.log.error("invalid b3 header: ", b3, ", ignored: ", err)
|
|
return 400
|
|
end
|
|
|
|
if sampled == "d" then
|
|
core.request.set_header(ctx, "x-b3-flags", "1")
|
|
sampled = "1"
|
|
end
|
|
else
|
|
-- X-B3-Sampled: if the client decided to sample this request, we do too.
|
|
sampled = headers["x-b3-sampled"]
|
|
trace_id = headers["x-b3-traceid"]
|
|
parent_span_id = headers["x-b3-parentspanid"]
|
|
request_span_id = headers["x-b3-spanid"]
|
|
end
|
|
|
|
local zipkin_ctx = core.tablepool.fetch("zipkin_ctx", 0, 3)
|
|
zipkin_ctx.trace_id = trace_id
|
|
zipkin_ctx.parent_span_id = parent_span_id
|
|
zipkin_ctx.request_span_id = request_span_id
|
|
ctx.zipkin = zipkin_ctx
|
|
|
|
local wire_context = tracer:extract("http_headers", ctx)
|
|
|
|
local start_timestamp = ngx.req.start_time()
|
|
local request_span = tracer:start_span("apisix.request", {
|
|
child_of = wire_context,
|
|
start_timestamp = start_timestamp,
|
|
tags = {
|
|
component = "apisix",
|
|
["span.kind"] = "server",
|
|
["http.method"] = ctx.var.request_method,
|
|
["http.url"] = ctx.var.request_uri,
|
|
-- TODO: support ipv6
|
|
["peer.ipv4"] = core.request.get_remote_client_ip(ctx),
|
|
["peer.port"] = core.request.get_remote_client_port(ctx),
|
|
}
|
|
})
|
|
|
|
ctx.opentracing = {
|
|
tracer = tracer,
|
|
wire_context = wire_context,
|
|
request_span = request_span,
|
|
}
|
|
|
|
-- Process sampled
|
|
if sampled == "1" or sampled == "true" then
|
|
per_req_sample_ratio = 1
|
|
elseif sampled == "0" or sampled == "false" then
|
|
per_req_sample_ratio = 0
|
|
end
|
|
|
|
ctx.opentracing_sample = tracer.sampler:sample(per_req_sample_ratio or conf.sample_ratio)
|
|
if not ctx.opentracing_sample then
|
|
request_span:set_baggage_item("x-b3-sampled","0")
|
|
else
|
|
request_span:set_baggage_item("x-b3-sampled","1")
|
|
end
|
|
|
|
if plugin_info.set_ngx_var then
|
|
local span_context = request_span:context()
|
|
ngx_var.zipkin_context_traceparent = string_format("00-%s-%s-%02x",
|
|
to_hex(span_context.trace_id),
|
|
to_hex(span_context.span_id),
|
|
span_context:get_baggage_item("x-b3-sampled"))
|
|
ngx_var.zipkin_trace_id = span_context.trace_id
|
|
ngx_var.zipkin_span_id = span_context.span_id
|
|
end
|
|
|
|
if not ctx.opentracing_sample then
|
|
return
|
|
end
|
|
|
|
local request_span = ctx.opentracing.request_span
|
|
if conf.span_version == ZIPKIN_SPAN_VER_1 then
|
|
ctx.opentracing.rewrite_span = request_span:start_child_span("apisix.rewrite",
|
|
start_timestamp)
|
|
ctx.REWRITE_END_TIME = tracer:time()
|
|
ctx.opentracing.rewrite_span:finish(ctx.REWRITE_END_TIME)
|
|
else
|
|
ctx.opentracing.proxy_span = request_span:start_child_span("apisix.proxy",
|
|
start_timestamp)
|
|
end
|
|
end
|
|
|
|
function _M.access(conf, ctx)
|
|
local opentracing = ctx.opentracing
|
|
local tracer = opentracing.tracer
|
|
|
|
if conf.span_version == ZIPKIN_SPAN_VER_1 then
|
|
opentracing.access_span = opentracing.request_span:start_child_span(
|
|
"apisix.access", ctx.REWRITE_END_TIME)
|
|
|
|
ctx.ACCESS_END_TIME = tracer:time()
|
|
opentracing.access_span:finish(ctx.ACCESS_END_TIME)
|
|
|
|
opentracing.proxy_span = opentracing.request_span:start_child_span(
|
|
"apisix.proxy", ctx.ACCESS_END_TIME)
|
|
end
|
|
|
|
-- send headers to upstream
|
|
inject_header(ctx)
|
|
end
|
|
|
|
|
|
function _M.header_filter(conf, ctx)
|
|
if not ctx.opentracing_sample then
|
|
return
|
|
end
|
|
|
|
local opentracing = ctx.opentracing
|
|
local end_time = opentracing.tracer:time()
|
|
|
|
if conf.span_version == ZIPKIN_SPAN_VER_1 then
|
|
if opentracing.proxy_span then
|
|
opentracing.body_filter_span = opentracing.proxy_span:start_child_span(
|
|
"apisix.body_filter", end_time)
|
|
end
|
|
else
|
|
opentracing.proxy_span:finish(end_time)
|
|
opentracing.response_span = opentracing.request_span:start_child_span(
|
|
"apisix.response_span", end_time)
|
|
end
|
|
end
|
|
|
|
|
|
function _M.log(conf, ctx)
|
|
if ctx.zipkin then
|
|
core.tablepool.release("zipkin_ctx", ctx.zipkin)
|
|
ctx.zipkin = nil
|
|
end
|
|
|
|
if not ctx.opentracing_sample then
|
|
return
|
|
end
|
|
|
|
local opentracing = ctx.opentracing
|
|
|
|
local log_end_time = opentracing.tracer:time()
|
|
|
|
if conf.span_version == ZIPKIN_SPAN_VER_1 then
|
|
if opentracing.body_filter_span then
|
|
opentracing.body_filter_span:finish(log_end_time)
|
|
end
|
|
if opentracing.proxy_span then
|
|
opentracing.proxy_span:finish(log_end_time)
|
|
end
|
|
elseif opentracing.response_span then
|
|
opentracing.response_span:finish(log_end_time)
|
|
end
|
|
|
|
local upstream_status = core.response.get_upstream_status(ctx)
|
|
opentracing.request_span:set_tag("http.status_code", upstream_status)
|
|
|
|
opentracing.request_span:finish(log_end_time)
|
|
end
|
|
|
|
return _M
|