- Implements Apache APISIX packaging for Cloudron platform. - Includes Dockerfile, CloudronManifest.json, and start.sh. - Configured to use Cloudron's etcd addon. 🤖 Generated with Gemini CLI Co-Authored-By: Gemini <noreply@google.com>
236 lines
7.6 KiB
Lua
236 lines
7.6 KiB
Lua
--
|
|
-- Licensed to the Apache Software Foundation (ASF) under one or more
|
|
-- contributor license agreements. See the NOTICE file distributed with
|
|
-- this work for additional information regarding copyright ownership.
|
|
-- The ASF licenses this file to You under the Apache License, Version 2.0
|
|
-- (the "License"); you may not use this file except in compliance with
|
|
-- the License. You may obtain a copy of the License at
|
|
--
|
|
-- http://www.apache.org/licenses/LICENSE-2.0
|
|
--
|
|
-- Unless required by applicable law or agreed to in writing, software
|
|
-- distributed under the License is distributed on an "AS IS" BASIS,
|
|
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
-- See the License for the specific language governing permissions and
|
|
-- limitations under the License.
|
|
--
|
|
local core = require("apisix.core")
|
|
local setmetatable = setmetatable
|
|
local timer_at = ngx.timer.at
|
|
local ipairs = ipairs
|
|
local table = table
|
|
local now = ngx.now
|
|
local type = type
|
|
local batch_processor = {}
|
|
local batch_processor_mt = {
|
|
__index = batch_processor
|
|
}
|
|
local execute_func
|
|
local create_buffer_timer
|
|
local batch_metrics
|
|
local prometheus
|
|
if ngx.config.subsystem == "http" then
|
|
prometheus = require("apisix.plugins.prometheus.exporter")
|
|
end
|
|
|
|
|
|
local schema = {
|
|
type = "object",
|
|
properties = {
|
|
name = {type = "string", default = "log buffer"},
|
|
max_retry_count = {type = "integer", minimum = 0, default= 0},
|
|
retry_delay = {type = "integer", minimum = 0, default= 1},
|
|
buffer_duration = {type = "integer", minimum = 1, default= 60},
|
|
inactive_timeout = {type = "integer", minimum = 1, default= 5},
|
|
batch_max_size = {type = "integer", minimum = 1, default= 1000},
|
|
}
|
|
}
|
|
batch_processor.schema = schema
|
|
|
|
|
|
local function schedule_func_exec(self, delay, batch)
|
|
local hdl, err = timer_at(delay, execute_func, self, batch)
|
|
if not hdl then
|
|
core.log.error("failed to create process timer: ", err)
|
|
return
|
|
end
|
|
end
|
|
|
|
|
|
local function set_metrics(self, count)
|
|
-- add batch metric for every route
|
|
if batch_metrics and self.name and self.route_id and self.server_addr then
|
|
self.label = {self.name, self.route_id, self.server_addr}
|
|
batch_metrics:set(count, self.label)
|
|
end
|
|
end
|
|
|
|
|
|
local function slice_batch(batch, n)
|
|
local slice = {}
|
|
local idx = 1
|
|
for i = n or 1, #batch do
|
|
slice[idx] = batch[i]
|
|
idx = idx + 1
|
|
end
|
|
return slice
|
|
end
|
|
|
|
|
|
function execute_func(premature, self, batch)
|
|
if premature then
|
|
return
|
|
end
|
|
|
|
-- In case of "err" and a valid "first_fail" batch processor considers, all first_fail-1
|
|
-- entries have been successfully consumed and hence reschedule the job for entries with
|
|
-- index first_fail to #entries based on the current retry policy.
|
|
local ok, err, first_fail = self.func(batch.entries, self.batch_max_size)
|
|
if not ok then
|
|
if first_fail then
|
|
core.log.error("Batch Processor[", self.name, "] failed to process entries [",
|
|
#batch.entries + 1 - first_fail, "/", #batch.entries ,"]: ", err)
|
|
batch.entries = slice_batch(batch.entries, first_fail)
|
|
self.processed_entries = self.processed_entries + first_fail - 1
|
|
else
|
|
core.log.error("Batch Processor[", self.name,
|
|
"] failed to process entries: ", err)
|
|
end
|
|
|
|
batch.retry_count = batch.retry_count + 1
|
|
if batch.retry_count <= self.max_retry_count and #batch.entries > 0 then
|
|
schedule_func_exec(self, self.retry_delay,
|
|
batch)
|
|
else
|
|
self.processed_entries = self.processed_entries + #batch.entries
|
|
core.log.error("Batch Processor[", self.name,"] exceeded ",
|
|
"the max_retry_count[", batch.retry_count,
|
|
"] dropping the entries")
|
|
end
|
|
return
|
|
end
|
|
self.processed_entries = self.processed_entries + #batch.entries
|
|
core.log.debug("Batch Processor[", self.name,
|
|
"] successfully processed the entries")
|
|
end
|
|
|
|
|
|
local function flush_buffer(premature, self)
|
|
if premature then
|
|
return
|
|
end
|
|
|
|
if now() - self.last_entry_t >= self.inactive_timeout or
|
|
now() - self.first_entry_t >= self.buffer_duration
|
|
then
|
|
core.log.debug("Batch Processor[", self.name ,"] buffer ",
|
|
"duration exceeded, activating buffer flush")
|
|
self:process_buffer()
|
|
self.is_timer_running = false
|
|
return
|
|
end
|
|
|
|
-- buffer duration did not exceed or the buffer is active,
|
|
-- extending the timer
|
|
core.log.debug("Batch Processor[", self.name ,"] extending buffer timer")
|
|
create_buffer_timer(self)
|
|
end
|
|
|
|
|
|
function create_buffer_timer(self)
|
|
local hdl, err = timer_at(self.inactive_timeout, flush_buffer, self)
|
|
if not hdl then
|
|
core.log.error("failed to create buffer timer: ", err)
|
|
return
|
|
end
|
|
self.is_timer_running = true
|
|
end
|
|
|
|
|
|
function batch_processor:new(func, config)
|
|
local ok, err = core.schema.check(schema, config)
|
|
if not ok then
|
|
return nil, err
|
|
end
|
|
|
|
if type(func) ~= "function" then
|
|
return nil, "Invalid argument, arg #1 must be a function"
|
|
end
|
|
|
|
local processor = {
|
|
func = func,
|
|
buffer_duration = config.buffer_duration,
|
|
inactive_timeout = config.inactive_timeout,
|
|
max_retry_count = config.max_retry_count,
|
|
batch_max_size = config.batch_max_size,
|
|
retry_delay = config.retry_delay,
|
|
name = config.name,
|
|
batch_to_process = {},
|
|
entry_buffer = {entries = {}, retry_count = 0},
|
|
is_timer_running = false,
|
|
first_entry_t = 0,
|
|
last_entry_t = 0,
|
|
route_id = config.route_id,
|
|
server_addr = config.server_addr,
|
|
processed_entries = 0
|
|
}
|
|
|
|
return setmetatable(processor, batch_processor_mt)
|
|
end
|
|
|
|
function batch_processor:push(entry)
|
|
-- if the batch size is one then immediately send for processing
|
|
if self.batch_max_size == 1 then
|
|
local batch = {entries = {entry}, retry_count = 0}
|
|
schedule_func_exec(self, 0, batch)
|
|
return
|
|
end
|
|
|
|
if prometheus and prometheus.get_prometheus() and not batch_metrics and self.name
|
|
and self.route_id and self.server_addr then
|
|
batch_metrics = prometheus.get_prometheus():gauge("batch_process_entries",
|
|
"batch process remaining entries",
|
|
{"name", "route_id", "server_addr"})
|
|
end
|
|
|
|
local entries = self.entry_buffer.entries
|
|
table.insert(entries, entry)
|
|
set_metrics(self, #entries)
|
|
|
|
if #entries == 1 then
|
|
self.first_entry_t = now()
|
|
end
|
|
self.last_entry_t = now()
|
|
|
|
if self.batch_max_size <= #entries then
|
|
core.log.debug("Batch Processor[", self.name ,
|
|
"] batch max size has exceeded")
|
|
self:process_buffer()
|
|
end
|
|
|
|
if not self.is_timer_running then
|
|
create_buffer_timer(self)
|
|
end
|
|
end
|
|
|
|
|
|
function batch_processor:process_buffer()
|
|
-- If entries are present in the buffer move the entries to processing
|
|
if #self.entry_buffer.entries > 0 then
|
|
core.log.debug("transferring buffer entries to processing pipe line, ",
|
|
"buffercount[", #self.entry_buffer.entries ,"]")
|
|
self.batch_to_process[#self.batch_to_process + 1] = self.entry_buffer
|
|
self.entry_buffer = {entries = {}, retry_count = 0}
|
|
set_metrics(self, 0)
|
|
end
|
|
|
|
for _, batch in ipairs(self.batch_to_process) do
|
|
schedule_func_exec(self, 0, batch)
|
|
end
|
|
|
|
self.batch_to_process = {}
|
|
end
|
|
|
|
|
|
return batch_processor
|