- Implements Apache APISIX packaging for Cloudron platform. - Includes Dockerfile, CloudronManifest.json, and start.sh. - Configured to use Cloudron's etcd addon. 🤖 Generated with Gemini CLI Co-Authored-By: Gemini <noreply@google.com>
511 lines
17 KiB
Lua
511 lines
17 KiB
Lua
--
|
|
-- Licensed to the Apache Software Foundation (ASF) under one or more
|
|
-- contributor license agreements. See the NOTICE file distributed with
|
|
-- this work for additional information regarding copyright ownership.
|
|
-- The ASF licenses this file to You under the Apache License, Version 2.0
|
|
-- (the "License"); you may not use this file except in compliance with
|
|
-- the License. You may obtain a copy of the License at
|
|
--
|
|
-- http://www.apache.org/licenses/LICENSE-2.0
|
|
--
|
|
-- Unless required by applicable law or agreed to in writing, software
|
|
-- distributed under the License is distributed on an "AS IS" BASIS,
|
|
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
-- See the License for the specific language governing permissions and
|
|
-- limitations under the License.
|
|
--
|
|
|
|
local core = require("apisix.core")
|
|
local errlog = require("ngx.errlog")
|
|
local batch_processor = require("apisix.utils.batch-processor")
|
|
local plugin = require("apisix.plugin")
|
|
local timers = require("apisix.timers")
|
|
local http = require("resty.http")
|
|
local producer = require("resty.kafka.producer")
|
|
local plugin_name = "error-log-logger"
|
|
local table = core.table
|
|
local schema_def = core.schema
|
|
local ngx = ngx
|
|
local tcp = ngx.socket.tcp
|
|
local tostring = tostring
|
|
local ipairs = ipairs
|
|
local string = require("string")
|
|
local lrucache = core.lrucache.new({
|
|
ttl = 300, count = 32
|
|
})
|
|
local kafka_prod_lrucache = core.lrucache.new({
|
|
ttl = 300, count = 32
|
|
})
|
|
|
|
|
|
local metadata_schema = {
|
|
type = "object",
|
|
properties = {
|
|
tcp = {
|
|
type = "object",
|
|
properties = {
|
|
host = schema_def.host_def,
|
|
port = {type = "integer", minimum = 0},
|
|
tls = {type = "boolean", default = false},
|
|
tls_server_name = {type = "string"},
|
|
},
|
|
required = {"host", "port"}
|
|
},
|
|
skywalking = {
|
|
type = "object",
|
|
properties = {
|
|
endpoint_addr = {schema_def.uri, default = "http://127.0.0.1:12900/v3/logs"},
|
|
service_name = {type = "string", default = "APISIX"},
|
|
service_instance_name = {type="string", default = "APISIX Service Instance"},
|
|
},
|
|
},
|
|
clickhouse = {
|
|
type = "object",
|
|
properties = {
|
|
endpoint_addr = {schema_def.uri_def, default="http://127.0.0.1:8123"},
|
|
user = {type = "string", default = "default"},
|
|
password = {type = "string", default = ""},
|
|
database = {type = "string", default = ""},
|
|
logtable = {type = "string", default = ""},
|
|
},
|
|
required = {"endpoint_addr", "user", "password", "database", "logtable"}
|
|
},
|
|
kafka = {
|
|
type = "object",
|
|
properties = {
|
|
brokers = {
|
|
type = "array",
|
|
minItems = 1,
|
|
items = {
|
|
type = "object",
|
|
properties = {
|
|
host = {
|
|
type = "string",
|
|
description = "the host of kafka broker",
|
|
},
|
|
port = {
|
|
type = "integer",
|
|
minimum = 1,
|
|
maximum = 65535,
|
|
description = "the port of kafka broker",
|
|
},
|
|
sasl_config = {
|
|
type = "object",
|
|
description = "sasl config",
|
|
properties = {
|
|
mechanism = {
|
|
type = "string",
|
|
default = "PLAIN",
|
|
enum = {"PLAIN"},
|
|
},
|
|
user = { type = "string", description = "user" },
|
|
password = { type = "string", description = "password" },
|
|
},
|
|
required = {"user", "password"},
|
|
},
|
|
},
|
|
required = {"host", "port"},
|
|
},
|
|
uniqueItems = true,
|
|
},
|
|
kafka_topic = {type = "string"},
|
|
producer_type = {
|
|
type = "string",
|
|
default = "async",
|
|
enum = {"async", "sync"},
|
|
},
|
|
required_acks = {
|
|
type = "integer",
|
|
default = 1,
|
|
enum = { 0, 1, -1 },
|
|
},
|
|
key = {type = "string"},
|
|
-- in lua-resty-kafka, cluster_name is defined as number
|
|
-- see https://github.com/doujiang24/lua-resty-kafka#new-1
|
|
cluster_name = {type = "integer", minimum = 1, default = 1},
|
|
meta_refresh_interval = {type = "integer", minimum = 1, default = 30},
|
|
},
|
|
required = {"brokers", "kafka_topic"},
|
|
},
|
|
name = {type = "string", default = plugin_name},
|
|
level = {type = "string", default = "WARN", enum = {"STDERR", "EMERG", "ALERT", "CRIT",
|
|
"ERR", "ERROR", "WARN", "NOTICE", "INFO", "DEBUG"}},
|
|
timeout = {type = "integer", minimum = 1, default = 3},
|
|
keepalive = {type = "integer", minimum = 1, default = 30},
|
|
batch_max_size = {type = "integer", minimum = 0, default = 1000},
|
|
max_retry_count = {type = "integer", minimum = 0, default = 0},
|
|
retry_delay = {type = "integer", minimum = 0, default = 1},
|
|
buffer_duration = {type = "integer", minimum = 1, default = 60},
|
|
inactive_timeout = {type = "integer", minimum = 1, default = 3},
|
|
},
|
|
oneOf = {
|
|
{required = {"skywalking"}},
|
|
{required = {"tcp"}},
|
|
{required = {"clickhouse"}},
|
|
{required = {"kafka"}},
|
|
-- for compatible with old schema
|
|
{required = {"host", "port"}}
|
|
},
|
|
encrypt_fields = {"clickhouse.password"},
|
|
}
|
|
|
|
|
|
local schema = {
|
|
type = "object",
|
|
}
|
|
|
|
|
|
local log_level = {
|
|
STDERR = ngx.STDERR,
|
|
EMERG = ngx.EMERG,
|
|
ALERT = ngx.ALERT,
|
|
CRIT = ngx.CRIT,
|
|
ERR = ngx.ERR,
|
|
ERROR = ngx.ERR,
|
|
WARN = ngx.WARN,
|
|
NOTICE = ngx.NOTICE,
|
|
INFO = ngx.INFO,
|
|
DEBUG = ngx.DEBUG
|
|
}
|
|
|
|
|
|
local config = {}
|
|
local log_buffer
|
|
|
|
|
|
local _M = {
|
|
version = 0.1,
|
|
priority = 1091,
|
|
name = plugin_name,
|
|
schema = schema,
|
|
metadata_schema = metadata_schema,
|
|
scope = "global",
|
|
}
|
|
|
|
|
|
function _M.check_schema(conf, schema_type)
|
|
if schema_type == core.schema.TYPE_METADATA then
|
|
return core.schema.check(metadata_schema, conf)
|
|
end
|
|
|
|
local check = {"skywalking.endpoint_addr", "clickhouse.endpoint_addr"}
|
|
core.utils.check_https(check, conf, plugin_name)
|
|
core.utils.check_tls_bool({"tcp.tls"}, conf, plugin_name)
|
|
|
|
return core.schema.check(schema, conf)
|
|
end
|
|
|
|
|
|
local function send_to_tcp_server(data)
|
|
local sock, soc_err = tcp()
|
|
|
|
if not sock then
|
|
return false, "failed to init the socket " .. soc_err
|
|
end
|
|
|
|
sock:settimeout(config.timeout * 1000)
|
|
|
|
local tcp_config = config.tcp
|
|
local ok, err = sock:connect(tcp_config.host, tcp_config.port)
|
|
if not ok then
|
|
return false, "failed to connect the TCP server: host[" .. tcp_config.host
|
|
.. "] port[" .. tostring(tcp_config.port) .. "] err: " .. err
|
|
end
|
|
|
|
if tcp_config.tls then
|
|
ok, err = sock:sslhandshake(false, tcp_config.tls_server_name, false)
|
|
if not ok then
|
|
sock:close()
|
|
return false, "failed to perform TLS handshake to TCP server: host["
|
|
.. tcp_config.host .. "] port[" .. tostring(tcp_config.port) .. "] err: " .. err
|
|
end
|
|
end
|
|
|
|
local bytes, err = sock:send(data)
|
|
if not bytes then
|
|
sock:close()
|
|
return false, "failed to send data to TCP server: host[" .. tcp_config.host
|
|
.. "] port[" .. tostring(tcp_config.port) .. "] err: " .. err
|
|
end
|
|
|
|
sock:setkeepalive(config.keepalive * 1000)
|
|
return true
|
|
end
|
|
|
|
|
|
local function send_to_skywalking(log_message)
|
|
local err_msg
|
|
local res = true
|
|
core.log.info("sending a batch logs to ", config.skywalking.endpoint_addr)
|
|
|
|
local httpc = http.new()
|
|
httpc:set_timeout(config.timeout * 1000)
|
|
|
|
local entries = {}
|
|
local service_instance_name = config.skywalking.service_instance_name
|
|
if service_instance_name == "$hostname" then
|
|
service_instance_name = core.utils.gethostname()
|
|
end
|
|
|
|
for i = 1, #log_message, 2 do
|
|
local content = {
|
|
service = config.skywalking.service_name,
|
|
serviceInstance = service_instance_name,
|
|
endpoint = "",
|
|
body = {
|
|
text = {
|
|
text = log_message[i]
|
|
}
|
|
}
|
|
}
|
|
table.insert(entries, content)
|
|
end
|
|
|
|
local httpc_res, httpc_err = httpc:request_uri(
|
|
config.skywalking.endpoint_addr,
|
|
{
|
|
method = "POST",
|
|
body = core.json.encode(entries),
|
|
keepalive_timeout = config.keepalive * 1000,
|
|
headers = {
|
|
["Content-Type"] = "application/json",
|
|
}
|
|
}
|
|
)
|
|
|
|
if not httpc_res then
|
|
return false, "error while sending data to skywalking["
|
|
.. config.skywalking.endpoint_addr .. "] " .. httpc_err
|
|
end
|
|
|
|
-- some error occurred in the server
|
|
if httpc_res.status >= 400 then
|
|
res = false
|
|
err_msg = string.format(
|
|
"server returned status code[%s] skywalking[%s] body[%s]",
|
|
httpc_res.status,
|
|
config.skywalking.endpoint_addr.endpoint_addr,
|
|
httpc_res:read_body()
|
|
)
|
|
end
|
|
|
|
return res, err_msg
|
|
end
|
|
|
|
|
|
local function send_to_clickhouse(log_message)
|
|
local err_msg
|
|
local res = true
|
|
core.log.info("sending a batch logs to ", config.clickhouse.endpoint_addr)
|
|
|
|
local httpc = http.new()
|
|
httpc:set_timeout(config.timeout * 1000)
|
|
|
|
local entries = {}
|
|
for i = 1, #log_message, 2 do
|
|
-- TODO Here save error log as a whole string to clickhouse 'data' column.
|
|
-- We will add more columns in the future.
|
|
table.insert(entries, core.json.encode({data=log_message[i]}))
|
|
end
|
|
|
|
local httpc_res, httpc_err = httpc:request_uri(
|
|
config.clickhouse.endpoint_addr,
|
|
{
|
|
method = "POST",
|
|
body = "INSERT INTO " .. config.clickhouse.logtable .." FORMAT JSONEachRow "
|
|
.. table.concat(entries, " "),
|
|
keepalive_timeout = config.keepalive * 1000,
|
|
headers = {
|
|
["Content-Type"] = "application/json",
|
|
["X-ClickHouse-User"] = config.clickhouse.user,
|
|
["X-ClickHouse-Key"] = config.clickhouse.password,
|
|
["X-ClickHouse-Database"] = config.clickhouse.database
|
|
}
|
|
}
|
|
)
|
|
|
|
if not httpc_res then
|
|
return false, "error while sending data to clickhouse["
|
|
.. config.clickhouse.endpoint_addr .. "] " .. httpc_err
|
|
end
|
|
|
|
-- some error occurred in the server
|
|
if httpc_res.status >= 400 then
|
|
res = false
|
|
err_msg = string.format(
|
|
"server returned status code[%s] clickhouse[%s] body[%s]",
|
|
httpc_res.status,
|
|
config.clickhouse.endpoint_addr.endpoint_addr,
|
|
httpc_res:read_body()
|
|
)
|
|
end
|
|
|
|
return res, err_msg
|
|
end
|
|
|
|
|
|
local function update_filter(value)
|
|
local level = log_level[value.level]
|
|
local status, err = errlog.set_filter_level(level)
|
|
if not status then
|
|
return nil, "failed to set filter level by ngx.errlog, the error is :" .. err
|
|
else
|
|
core.log.notice("set the filter_level to ", value.level)
|
|
end
|
|
|
|
return value
|
|
end
|
|
|
|
|
|
local function create_producer(broker_list, broker_config, cluster_name)
|
|
core.log.info("create new kafka producer instance")
|
|
return producer:new(broker_list, broker_config, cluster_name)
|
|
end
|
|
|
|
|
|
local function send_to_kafka(log_message)
|
|
-- avoid race of the global config
|
|
local metadata = plugin.plugin_metadata(plugin_name)
|
|
if not (metadata and metadata.value and metadata.modifiedIndex) then
|
|
return false, "please set the correct plugin_metadata for " .. plugin_name
|
|
end
|
|
local config, err = lrucache(plugin_name, metadata.modifiedIndex, update_filter, metadata.value)
|
|
if not config then
|
|
return false, "get config failed: " .. err
|
|
end
|
|
|
|
core.log.info("sending a batch logs to kafka brokers: ",
|
|
core.json.delay_encode(config.kafka.brokers))
|
|
|
|
local broker_config = {}
|
|
broker_config["request_timeout"] = config.timeout * 1000
|
|
broker_config["producer_type"] = config.kafka.producer_type
|
|
broker_config["required_acks"] = config.kafka.required_acks
|
|
broker_config["refresh_interval"] = config.kafka.meta_refresh_interval * 1000
|
|
|
|
-- reuse producer via kafka_prod_lrucache to avoid unbalanced partitions of messages in kafka
|
|
local prod, err = kafka_prod_lrucache(plugin_name, metadata.modifiedIndex,
|
|
create_producer, config.kafka.brokers, broker_config,
|
|
config.kafka.cluster_name)
|
|
if not prod then
|
|
return false, "get kafka producer failed: " .. err
|
|
end
|
|
core.log.info("kafka cluster name ", config.kafka.cluster_name, ", broker_list[1] port ",
|
|
prod.client.broker_list[1].port)
|
|
|
|
local ok
|
|
for i = 1, #log_message, 2 do
|
|
ok, err = prod:send(config.kafka.kafka_topic,
|
|
config.kafka.key, core.json.encode(log_message[i]))
|
|
if not ok then
|
|
return false, "failed to send data to Kafka topic: " .. err ..
|
|
", brokers: " .. core.json.encode(config.kafka.brokers)
|
|
end
|
|
core.log.info("send data to kafka: ", core.json.delay_encode(log_message[i]))
|
|
end
|
|
|
|
return true
|
|
end
|
|
|
|
|
|
local function send(data)
|
|
if config.skywalking then
|
|
return send_to_skywalking(data)
|
|
elseif config.clickhouse then
|
|
return send_to_clickhouse(data)
|
|
elseif config.kafka then
|
|
return send_to_kafka(data)
|
|
end
|
|
return send_to_tcp_server(data)
|
|
end
|
|
|
|
|
|
local function process()
|
|
local metadata = plugin.plugin_metadata(plugin_name)
|
|
if not (metadata and metadata.value and metadata.modifiedIndex) then
|
|
core.log.info("please set the correct plugin_metadata for ", plugin_name)
|
|
return
|
|
else
|
|
local err
|
|
config, err = lrucache(plugin_name, metadata.modifiedIndex, update_filter, metadata.value)
|
|
if not config then
|
|
core.log.warn("set log filter failed for ", err)
|
|
return
|
|
end
|
|
if not (config.tcp or config.skywalking or config.clickhouse or config.kafka) then
|
|
config.tcp = {
|
|
host = config.host,
|
|
port = config.port,
|
|
tls = config.tls,
|
|
tls_server_name = config.tls_server_name
|
|
}
|
|
core.log.warn(
|
|
string.format("The schema is out of date. Please update to the new configuration, "
|
|
.. "for example: {\"tcp\": {\"host\": \"%s\", \"port\": \"%s\"}}",
|
|
config.host, config.port
|
|
))
|
|
end
|
|
end
|
|
|
|
local err_level = log_level[metadata.value.level]
|
|
local entries = {}
|
|
local logs = errlog.get_logs(9)
|
|
while ( logs and #logs>0 ) do
|
|
for i = 1, #logs, 3 do
|
|
-- There will be some stale error logs after the filter level changed.
|
|
-- We should avoid reporting them.
|
|
if logs[i] <= err_level then
|
|
table.insert(entries, logs[i + 2])
|
|
table.insert(entries, "\n")
|
|
end
|
|
end
|
|
logs = errlog.get_logs(9)
|
|
end
|
|
|
|
if #entries == 0 then
|
|
return
|
|
end
|
|
|
|
if log_buffer then
|
|
for _, v in ipairs(entries) do
|
|
log_buffer:push(v)
|
|
end
|
|
return
|
|
end
|
|
|
|
local config_bat = {
|
|
name = config.name,
|
|
retry_delay = config.retry_delay,
|
|
batch_max_size = config.batch_max_size,
|
|
max_retry_count = config.max_retry_count,
|
|
buffer_duration = config.buffer_duration,
|
|
inactive_timeout = config.inactive_timeout,
|
|
}
|
|
|
|
local err
|
|
log_buffer, err = batch_processor:new(send, config_bat)
|
|
|
|
if not log_buffer then
|
|
core.log.warn("error when creating the batch processor: ", err)
|
|
return
|
|
end
|
|
|
|
for _, v in ipairs(entries) do
|
|
log_buffer:push(v)
|
|
end
|
|
|
|
end
|
|
|
|
|
|
function _M.init()
|
|
timers.register_timer("plugin#error-log-logger", process)
|
|
end
|
|
|
|
|
|
function _M.destroy()
|
|
timers.unregister_timer("plugin#error-log-logger")
|
|
end
|
|
|
|
|
|
return _M
|