- Implements Apache APISIX packaging for Cloudron platform. - Includes Dockerfile, CloudronManifest.json, and start.sh. - Configured to use Cloudron's etcd addon. 🤖 Generated with Gemini CLI Co-Authored-By: Gemini <noreply@google.com>
230 lines
8.4 KiB
Lua
230 lines
8.4 KiB
Lua
--
|
||
-- Licensed to the Apache Software Foundation (ASF) under one or more
|
||
-- contributor license agreements. See the NOTICE file distributed with
|
||
-- this work for additional information regarding copyright ownership.
|
||
-- The ASF licenses this file to You under the Apache License, Version 2.0
|
||
-- (the "License"); you may not use this file except in compliance with
|
||
-- the License. You may obtain a copy of the License at
|
||
--
|
||
-- http://www.apache.org/licenses/LICENSE-2.0
|
||
--
|
||
-- Unless required by applicable law or agreed to in writing, software
|
||
-- distributed under the License is distributed on an "AS IS" BASIS,
|
||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
-- See the License for the specific language governing permissions and
|
||
-- limitations under the License.
|
||
--
|
||
local type = type
|
||
local pairs = pairs
|
||
local math_random = math.random
|
||
local ngx = ngx
|
||
|
||
local http = require("resty.http")
|
||
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
|
||
local core = require("apisix.core")
|
||
local str_format = core.string.format
|
||
|
||
local plugin_name = "lago"
|
||
local batch_processor_manager = bp_manager_mod.new("lago logger")
|
||
|
||
local schema = {
|
||
type = "object",
|
||
properties = {
|
||
-- core configurations
|
||
endpoint_addrs = {
|
||
type = "array",
|
||
minItems = 1,
|
||
items = core.schema.uri_def,
|
||
description = "Lago API address, like http://127.0.0.1:3000, "
|
||
.. "it supports both self-hosted and cloud. If multiple endpoints are"
|
||
.. " configured, the log will be pushed to a randomly determined"
|
||
.. " endpoint from the list.",
|
||
},
|
||
endpoint_uri = {
|
||
type = "string",
|
||
minLength = 1,
|
||
default = "/api/v1/events/batch",
|
||
description = "Lago API endpoint, it needs to be set to the batch send endpoint.",
|
||
},
|
||
token = {
|
||
type = "string",
|
||
description = "Lago API key, create one for your organization on dashboard."
|
||
},
|
||
event_transaction_id = {
|
||
type = "string",
|
||
description = "Event's transaction ID, it is used to identify and de-duplicate"
|
||
.. " the event, it supports string templates containing APISIX and"
|
||
.. " NGINX variables, like \"req_${request_id}\", which allows you"
|
||
.. " to use values returned by upstream services or request-id"
|
||
.. " plugin integration",
|
||
},
|
||
event_subscription_id = {
|
||
type = "string",
|
||
description = "Event's subscription ID, which is automatically generated or"
|
||
.. " specified by you when you assign the plan to the customer on"
|
||
.. " Lago, used to associate API consumption to a customer subscription,"
|
||
.. " it supports string templates containing APISIX and NGINX variables,"
|
||
.. " like \"cus_${consumer_name}\", which allows you to use values"
|
||
.. " returned by upstream services or APISIX consumer",
|
||
},
|
||
event_code = {
|
||
type = "string",
|
||
description = "Lago billable metric's code for associating an event to a specified"
|
||
.. "billable item",
|
||
},
|
||
event_properties = {
|
||
type = "object",
|
||
patternProperties = {
|
||
[".*"] = {
|
||
type = "string",
|
||
minLength = 1,
|
||
},
|
||
},
|
||
description = "Event's properties, used to attach information to an event, this"
|
||
.. " allows you to send certain information on a event to Lago, such"
|
||
.. " as sending HTTP status to take a failed request off the bill, or"
|
||
.. " sending the AI token consumption in the response body for accurate"
|
||
.. " billing, its keys are fixed strings and its values can be string"
|
||
.. " templates containing APISIX and NGINX variables, like \"${status}\""
|
||
},
|
||
|
||
-- connection layer configurations
|
||
ssl_verify = {type = "boolean", default = true},
|
||
timeout = {
|
||
type = "integer",
|
||
minimum = 1,
|
||
maximum = 60000,
|
||
default = 3000,
|
||
description = "timeout in milliseconds",
|
||
},
|
||
keepalive = {type = "boolean", default = true},
|
||
keepalive_timeout = {
|
||
type = "integer",
|
||
minimum = 1000,
|
||
default = 60000,
|
||
description = "keepalive timeout in milliseconds",
|
||
},
|
||
keepalive_pool = {type = "integer", minimum = 1, default = 5},
|
||
},
|
||
required = {"endpoint_addrs", "token", "event_transaction_id", "event_subscription_id",
|
||
"event_code"},
|
||
encrypt_fields = {"token"},
|
||
}
|
||
schema = batch_processor_manager:wrap_schema(schema)
|
||
|
||
-- According to https://getlago.com/docs/api-reference/events/batch, the maximum batch size is 100,
|
||
-- so we have to override the default batch size to make it work out of the box,the plugin does
|
||
-- not set a maximum limit, so if Lago relaxes the limit, then user can modify it
|
||
-- to a larger batch size
|
||
-- This does not affect other plugins, schema is appended after deep copy
|
||
schema.properties.batch_max_size.default = 100
|
||
|
||
|
||
local _M = {
|
||
version = 0.1,
|
||
priority = 415,
|
||
name = plugin_name,
|
||
schema = schema,
|
||
}
|
||
|
||
|
||
function _M.check_schema(conf, schema_type)
|
||
local check = {"endpoint_addrs"}
|
||
core.utils.check_https(check, conf, plugin_name)
|
||
core.utils.check_tls_bool({"ssl_verify"}, conf, plugin_name)
|
||
|
||
return core.schema.check(schema, conf)
|
||
end
|
||
|
||
|
||
local function send_http_data(conf, data)
|
||
local body, err = core.json.encode(data)
|
||
if not body then
|
||
return false, str_format("failed to encode json: %s", err)
|
||
end
|
||
local params = {
|
||
headers = {
|
||
["Content-Type"] = "application/json",
|
||
["Authorization"] = "Bearer " .. conf.token,
|
||
},
|
||
keepalive = conf.keepalive,
|
||
ssl_verify = conf.ssl_verify,
|
||
method = "POST",
|
||
body = body,
|
||
}
|
||
|
||
if conf.keepalive then
|
||
params.keepalive_timeout = conf.keepalive_timeout
|
||
params.keepalive_pool = conf.keepalive_pool
|
||
end
|
||
|
||
local httpc, err = http.new()
|
||
if not httpc then
|
||
return false, str_format("create http client error: %s", err)
|
||
end
|
||
httpc:set_timeout(conf.timeout)
|
||
|
||
-- select an random endpoint and build URL
|
||
local endpoint_url = conf.endpoint_addrs[math_random(#conf.endpoint_addrs)]..conf.endpoint_uri
|
||
local res, err = httpc:request_uri(endpoint_url, params)
|
||
if not res then
|
||
return false, err
|
||
end
|
||
|
||
if res.status >= 300 then
|
||
return false, str_format("lago api returned status: %d, body: %s",
|
||
res.status, res.body or "")
|
||
end
|
||
|
||
return true
|
||
end
|
||
|
||
|
||
function _M.log(conf, ctx)
|
||
-- build usage event
|
||
local event_transaction_id, err = core.utils.resolve_var(conf.event_transaction_id, ctx.var)
|
||
if err then
|
||
core.log.error("failed to resolve event_transaction_id, event dropped: ", err)
|
||
return
|
||
end
|
||
|
||
local event_subscription_id, err = core.utils.resolve_var(conf.event_subscription_id, ctx.var)
|
||
if err then
|
||
core.log.error("failed to resolve event_subscription_id, event dropped: ", err)
|
||
return
|
||
end
|
||
|
||
local entry = {
|
||
transaction_id = event_transaction_id,
|
||
external_subscription_id = event_subscription_id,
|
||
code = conf.event_code,
|
||
timestamp = ngx.req.start_time(),
|
||
}
|
||
|
||
if conf.event_properties and type(conf.event_properties) == "table" then
|
||
entry.properties = core.table.deepcopy(conf.event_properties)
|
||
for key, value in pairs(entry.properties) do
|
||
local new_val, err, n_resolved = core.utils.resolve_var(value, ctx.var)
|
||
if not err and n_resolved > 0 then
|
||
entry.properties[key] = new_val
|
||
end
|
||
end
|
||
end
|
||
|
||
if batch_processor_manager:add_entry(conf, entry) then
|
||
return
|
||
end
|
||
|
||
-- generate a function to be executed by the batch processor
|
||
local func = function(entries)
|
||
return send_http_data(conf, {
|
||
events = entries,
|
||
})
|
||
end
|
||
|
||
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
|
||
end
|
||
|
||
|
||
return _M
|