- Implements Apache APISIX packaging for Cloudron platform. - Includes Dockerfile, CloudronManifest.json, and start.sh. - Configured to use Cloudron's etcd addon. 🤖 Generated with Gemini CLI Co-Authored-By: Gemini <noreply@google.com>
373 lines
12 KiB
Perl
373 lines
12 KiB
Perl
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright ownership.
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
# (the "License"); you may not use this file except in compliance with
|
|
# the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
use Cwd qw(cwd);
|
|
use t::APISIX 'no_plan';
|
|
|
|
repeat_each(1);
|
|
no_long_string();
|
|
no_root_location();
|
|
|
|
my $apisix_home = $ENV{APISIX_HOME} // cwd();
|
|
|
|
add_block_preprocessor(sub {
|
|
my ($block) = @_;
|
|
|
|
my $block_init = <<_EOC_;
|
|
`ln -sf $apisix_home/apisix $apisix_home/t/servroot/apisix`;
|
|
_EOC_
|
|
|
|
$block->set_value("init", $block_init);
|
|
|
|
if (!defined $block->request) {
|
|
$block->set_value("request", "GET /t");
|
|
}
|
|
});
|
|
|
|
add_test_cleanup_handler(sub {
|
|
`rm -f $apisix_home/t/servroot/apisix`;
|
|
});
|
|
|
|
run_tests();
|
|
|
|
__DATA__
|
|
|
|
=== TEST 1: setup all-in-one test
|
|
--- config
|
|
location /t {
|
|
content_by_lua_block {
|
|
local data = {
|
|
{
|
|
url = "/apisix/admin/routes/kafka",
|
|
data = [[{
|
|
"upstream": {
|
|
"nodes": {
|
|
"127.0.0.1:9092": 1
|
|
},
|
|
"type": "none",
|
|
"scheme": "kafka"
|
|
},
|
|
"uri": "/kafka"
|
|
}]],
|
|
},
|
|
{
|
|
url = "/apisix/admin/routes/kafka-invalid",
|
|
data = [[{
|
|
"upstream": {
|
|
"nodes": {
|
|
"127.0.0.1:59092": 1
|
|
},
|
|
"type": "none",
|
|
"scheme": "kafka"
|
|
},
|
|
"uri": "/kafka-invalid"
|
|
}]],
|
|
},
|
|
{
|
|
url = "/apisix/admin/routes/kafka-tlsv",
|
|
data = [[{
|
|
"upstream": {
|
|
"nodes": {
|
|
"127.0.0.1:9093": 1
|
|
},
|
|
"type": "none",
|
|
"scheme": "kafka",
|
|
"tls": {
|
|
"verify": true
|
|
}
|
|
},
|
|
"uri": "/kafka-tlsv"
|
|
}]],
|
|
},
|
|
{
|
|
url = "/apisix/admin/routes/kafka-tls",
|
|
data = [[{
|
|
"upstream": {
|
|
"nodes": {
|
|
"127.0.0.1:9093": 1
|
|
},
|
|
"type": "none",
|
|
"scheme": "kafka",
|
|
"tls": {
|
|
"verify": false
|
|
}
|
|
},
|
|
"uri": "/kafka-tls"
|
|
}]],
|
|
},
|
|
{
|
|
url = "/apisix/admin/routes/kafka-sasl",
|
|
data = [[{
|
|
"upstream": {
|
|
"nodes": {
|
|
"127.0.0.1:9094": 1
|
|
},
|
|
"type": "none",
|
|
"scheme": "kafka"
|
|
},
|
|
"uri": "/kafka-sasl",
|
|
"plugins": {
|
|
"kafka-proxy": {
|
|
"sasl": {
|
|
"username": "admin",
|
|
"password": "admin-secret"
|
|
}
|
|
}
|
|
}
|
|
}]],
|
|
},
|
|
}
|
|
|
|
local t = require("lib.test_admin").test
|
|
|
|
for _, data in ipairs(data) do
|
|
local code, body = t(data.url, ngx.HTTP_PUT, data.data)
|
|
ngx.say(body)
|
|
end
|
|
}
|
|
}
|
|
--- response_body eval
|
|
"passed\n"x5
|
|
|
|
|
|
|
|
=== TEST 2: hit route (with HTTP request)
|
|
--- request
|
|
GET /kafka
|
|
--- error_code: 400
|
|
--- error_log
|
|
failed to initialize pubsub module, err: bad "upgrade" request header: nil
|
|
|
|
|
|
|
|
=== TEST 3: hit route (Kafka)
|
|
--- config
|
|
# The messages used in this test are produced in the linux-ci-init-service.sh
|
|
# script that prepares the CI environment
|
|
location /t {
|
|
content_by_lua_block {
|
|
local pb = require("pb")
|
|
local lib_pubsub = require("lib.pubsub")
|
|
local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/kafka")
|
|
local data = {
|
|
{
|
|
sequence = 0,
|
|
cmd_kafka_list_offset = {
|
|
topic = "not-exist",
|
|
partition = 0,
|
|
timestamp = -1,
|
|
},
|
|
},
|
|
{
|
|
sequence = 1,
|
|
cmd_kafka_fetch = {
|
|
topic = "not-exist",
|
|
partition = 0,
|
|
offset = 0,
|
|
},
|
|
},
|
|
{
|
|
-- Query first message offset
|
|
sequence = 2,
|
|
cmd_kafka_list_offset = {
|
|
topic = "test-consumer",
|
|
partition = 0,
|
|
timestamp = -2,
|
|
},
|
|
},
|
|
{
|
|
-- Query last message offset
|
|
sequence = 3,
|
|
cmd_kafka_list_offset = {
|
|
topic = "test-consumer",
|
|
partition = 0,
|
|
timestamp = -1,
|
|
},
|
|
},
|
|
{
|
|
-- Query by timestamp, 9999999999999 later than the
|
|
-- production time of any message
|
|
sequence = 4,
|
|
cmd_kafka_list_offset = {
|
|
topic = "test-consumer",
|
|
partition = 0,
|
|
timestamp = "9999999999999",
|
|
},
|
|
},
|
|
{
|
|
-- Query by timestamp, 1500000000000 ms earlier than the
|
|
-- production time of any message
|
|
sequence = 5,
|
|
cmd_kafka_list_offset = {
|
|
topic = "test-consumer",
|
|
partition = 0,
|
|
timestamp = "1500000000000",
|
|
},
|
|
},
|
|
{
|
|
sequence = 6,
|
|
cmd_kafka_fetch = {
|
|
topic = "test-consumer",
|
|
partition = 0,
|
|
offset = 14,
|
|
},
|
|
},
|
|
{
|
|
sequence = 7,
|
|
cmd_kafka_fetch = {
|
|
topic = "test-consumer",
|
|
partition = 0,
|
|
offset = 999,
|
|
},
|
|
},
|
|
}
|
|
|
|
for i = 1, #data do
|
|
-- force clear state
|
|
pb.state(nil)
|
|
local data = test_pubsub:send_recv_ws_binary(data[i])
|
|
if data.error_resp then
|
|
ngx.say(data.sequence..data.error_resp.message)
|
|
end
|
|
if data.kafka_list_offset_resp then
|
|
ngx.say(data.sequence.."offset: "..data.kafka_list_offset_resp.offset)
|
|
end
|
|
if data.kafka_fetch_resp then
|
|
ngx.say(data.sequence.."offset: "..data.kafka_fetch_resp.messages[1].offset..
|
|
" msg: "..data.kafka_fetch_resp.messages[1].value)
|
|
end
|
|
end
|
|
test_pubsub:close_ws()
|
|
}
|
|
}
|
|
--- response_body
|
|
0failed to list offset, topic: not-exist, partition: 0, err: not found topic
|
|
1failed to fetch message, topic: not-exist, partition: 0, err: not found topic
|
|
2offset: 0
|
|
3offset: 30
|
|
4offset: -1
|
|
5offset: 0
|
|
6offset: 14 msg: testmsg15
|
|
7failed to fetch message, topic: test-consumer, partition: 0, err: OFFSET_OUT_OF_RANGE
|
|
|
|
|
|
|
|
=== TEST 4: hit route (Kafka with invalid node ip)
|
|
--- config
|
|
# The messages used in this test are produced in the linux-ci-init-service.sh
|
|
# script that prepares the CI environment
|
|
location /t {
|
|
content_by_lua_block {
|
|
local lib_pubsub = require("lib.pubsub")
|
|
local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/kafka-invalid")
|
|
|
|
local data = test_pubsub:send_recv_ws_binary({
|
|
sequence = 0,
|
|
cmd_kafka_list_offset = {
|
|
topic = "test-consumer",
|
|
partition = 0,
|
|
timestamp = -2,
|
|
},
|
|
})
|
|
if data.error_resp then
|
|
ngx.say(data.sequence..data.error_resp.message)
|
|
end
|
|
test_pubsub:close_ws()
|
|
}
|
|
}
|
|
--- response_body
|
|
0failed to list offset, topic: test-consumer, partition: 0, err: not found topic
|
|
--- error_log
|
|
all brokers failed in fetch topic metadata
|
|
|
|
|
|
|
|
=== TEST 5: hit route (Kafka with TLS)
|
|
--- config
|
|
location /t {
|
|
content_by_lua_block {
|
|
local lib_pubsub = require("lib.pubsub")
|
|
local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/kafka-tls")
|
|
|
|
local data = test_pubsub:send_recv_ws_binary({
|
|
sequence = 0,
|
|
cmd_kafka_list_offset = {
|
|
topic = "test-consumer",
|
|
partition = 0,
|
|
timestamp = -1,
|
|
},
|
|
})
|
|
if data.kafka_list_offset_resp then
|
|
ngx.say(data.sequence.."offset: "..data.kafka_list_offset_resp.offset)
|
|
end
|
|
test_pubsub:close_ws()
|
|
}
|
|
}
|
|
--- response_body
|
|
0offset: 30
|
|
|
|
|
|
|
|
=== TEST 6: hit route (Kafka with TLS + ssl verify)
|
|
--- config
|
|
location /t {
|
|
content_by_lua_block {
|
|
local lib_pubsub = require("lib.pubsub")
|
|
local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/kafka-tlsv")
|
|
|
|
local data = test_pubsub:send_recv_ws_binary({
|
|
sequence = 0,
|
|
cmd_kafka_list_offset = {
|
|
topic = "test-consumer",
|
|
partition = 0,
|
|
timestamp = -1,
|
|
},
|
|
})
|
|
if data.kafka_list_offset_resp then
|
|
ngx.say(data.sequence.."offset: "..data.kafka_list_offset_resp.offset)
|
|
end
|
|
test_pubsub:close_ws()
|
|
}
|
|
}
|
|
--- error_log eval
|
|
qr/self[- ]signed certificate/
|
|
|
|
|
|
|
|
=== TEST 7: hit route (Kafka with SASL)
|
|
--- config
|
|
location /t {
|
|
content_by_lua_block {
|
|
local lib_pubsub = require("lib.pubsub")
|
|
local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/kafka-sasl")
|
|
|
|
local data = test_pubsub:send_recv_ws_binary({
|
|
sequence = 0,
|
|
cmd_kafka_list_offset = {
|
|
topic = "test-consumer",
|
|
partition = 0,
|
|
timestamp = -1,
|
|
},
|
|
})
|
|
if data.kafka_list_offset_resp then
|
|
ngx.say(data.sequence.."offset: "..data.kafka_list_offset_resp.offset)
|
|
end
|
|
test_pubsub:close_ws()
|
|
}
|
|
}
|
|
--- response_body
|
|
0offset: 30
|