unetmsg: add unet pub/sub message broker based on ubus

This service automatically establishes connections to any hosts that are members
of the same unet network, and allows publish/subscribe exchanges via ubus channels.

Signed-off-by: Felix Fietkau <nbd@nbd.name>
This commit is contained in:
Felix Fietkau 2025-03-07 18:20:23 +01:00
parent 5d40123818
commit 77f8a70f65
9 changed files with 1223 additions and 2 deletions

View File

@ -1024,13 +1024,23 @@ const host_editor = {
const UnetHostEdit = editor.new(host_editor);
function is_vxlan_service(ctx, argv, named, spec)
function has_service_type(ctx, named, name)
{
let type = named.type;
if (ctx.data.edit)
type ??= ctx.data.edit.type;
return type == "vxlan";
return type == name;
}
function is_vxlan_service(ctx, argv, named, spec)
{
return has_service_type(ctx, named, "vxlan");
}
function is_unetmsg_service(ctx, argv, named, spec)
{
return has_service_type(ctx, named, "unetmsg");
}
function get_config_object(ctx, spec, obj, argv)
@ -1106,6 +1116,16 @@ const service_editor = {
value: (ctx) => keys(ctx.data.netdata.json.hosts)
}
},
"unetmsg-allowed": {
help: "Allowed topics for this unetmsg service group",
attribute: "allowed",
available: is_unetmsg_service,
get_object: get_config_object,
multiple: true,
args: {
type: "string"
}
},
}
};

View File

@ -0,0 +1,35 @@
#
# Copyright (C) 2025 OpenWrt.org
#
# This is free software, licensed under the GNU General Public License v2.
# See /LICENSE for more information.
#
include $(TOPDIR)/rules.mk
PKG_NAME:=unetmsg
PKG_RELEASE:=$(AUTORELEASE)
PKG_LICENSE:=GPL-2.0
PKG_MAINTAINER:=Felix Fietkau <nbd@nbd.name>
include $(INCLUDE_DIR)/package.mk
define Package/unetmsg
SECTION:=utils
CATEGORY:=Utilities
TITLE:=unet network pub/sub message broker
DEPENDS:=+ucode +ucode-mod-socket \
+ucode-mod-ubus +ucode-mod-uloop \
+ucode-mod-fs
endef
define Build/Compile
:
endef
define Package/unetmsg/install
$(CP) ./files/* $(1)/
endef
$(eval $(call BuildPackage,unetmsg))

View File

@ -0,0 +1,14 @@
#!/bin/sh /etc/rc.common
# Copyright (c) 2021 OpenWrt.org
START=50
USE_PROCD=1
PROG=/usr/sbin/unetmsgd
start_service() {
procd_open_instance
procd_set_param command "$PROG"
procd_set_param respawn
procd_close_instance
}

View File

@ -0,0 +1,68 @@
#!/usr/bin/env ucode
// SPDX-License-Identifier: GPL-2.0+
/*
* Copyright (C) 2025 Felix Fietkau <nbd@nbd.name>
*/
'use strict';
import * as libubus from "ubus";
import * as uloop from "uloop";
import * as unetmsg_core from "unetmsg.unetmsgd";
uloop.init();
let ubus = libubus.connect();
if (!ubus) {
warn(`Failed to connect to ubus\n`);
exit(1);
}
let core = unetmsg_core.init(ubus, true);
function update_acl() {
let data = ubus.call(libubus.SYSTEM_OBJECT_ACL, "query");
core.acl_set(data.acl);
}
let obj = ubus.publish("unetmsg", {
channel: {
args: {},
call: function(req) {
if (!core.client.new(req))
return libubus.STATUS_INVALID_ARGUMENT;
return 0;
}
},
list: {
args: {
name: "",
},
call: function(req) {
let ret = [];
for (let name in core.publish)
if (req.args.name == null || wildcard(name, req.args.name))
push(ret, name);
return {
id: sort(ret),
};
},
},
request: {
args: {
name: "",
type: "",
data: {},
},
call: function(req) {
try {
core.handle_request(null, req, req.args, true);
} catch (e) {
core.exception(e);
}
}
}
});
ubus.subscriber("ubus.acl.sequence", () => update_acl());
update_acl();
uloop.run();

View File

@ -0,0 +1,159 @@
// SPDX-License-Identifier: GPL-2.0+
/*
* Copyright (C) 2025 Felix Fietkau <nbd@nbd.name>
*/
'use strict';
import * as libubus from "ubus";
import * as uloop from "uloop";
function publish(name, request_cb)
{
if (!this.channel)
this.connect();
if (type(name) == "string")
name = [ name ];
for (let cur in name)
this.cb_pub[cur] = request_cb;
if (!this.channel)
return;
this.channel.request("publish", { name });
}
function subscribe(name, message_cb)
{
if (!this.channel)
this.connect();
if (type(name) == "string")
name = [ name ];
for (let cur in name)
this.cb_sub[cur] = message_cb;
if (!this.channel)
return;
this.channel.request("subscribe", { name });
}
function send(name, type, data)
{
this.channel.request({
method: "message",
return: "ignore",
data: {
name, type, data
},
});
}
function default_complete_cb()
{
}
function request(name, type, data, data_cb, complete_cb)
{
if (!this.channel)
this.connect();
if (!this.channel)
return;
let req = this.channel.defer({
method: "request",
data: {
name, type, data
},
data_cb,
cb: complete_cb
});
if (!complete_cb)
req.await();
}
function connect()
{
if (this.channel)
return;
let cl = this;
let res = cl.ubus.call({
object: "unetmsg",
method: "channel",
fd_cb: (fd) => {
cl.channel = libubus.open_channel(fd, cl.request_cb, cl.disconnect_cb, cl.timeout);
}
});
if (!this.channel) {
this.connect_timer.set(1000);
return;
}
if (length(this.cb_pub) > 0)
this.channel.request("publish", {
name: keys(this.cb_pub)
});
if (length(this.cb_sub) > 0)
this.channel.request("subscribe", {
name: keys(this.cb_sub)
});
}
const client_proto = {
connect, publish, subscribe, send, request,
close: function() {
if (this.channel)
this.channel.disconnect();
this.connect_timer.cancel();
for (let name in keys(this))
delete this[name];
}
};
function handle_request(cl, req)
{
let cb;
switch (req.type) {
case "message":
cb = cl.cb_sub[req.args.name];
if (cb)
return cb(req);
break;
case "request":
cb = cl.cb_pub[req.args.name];
if (cb)
return cb(req);
}
return 0;
}
export function open(ubus_conn, timeout)
{
let cl = proto({
cb_sub: {},
cb_pub: {},
ubus: ubus_conn,
timeout,
}, client_proto);
cl.request_cb = (req) => {
return handle_request(cl, req);
};
cl.disconnect_cb = () => {
cl.channel = null;
cl.connect_timer.set(100);
};
cl.connect_timer = uloop.timer(1, () => cl.connect());
return cl;
};

View File

@ -0,0 +1,127 @@
// SPDX-License-Identifier: GPL-2.0+
/*
* Copyright (C) 2025 Felix Fietkau <nbd@nbd.name>
*/
'use strict';
import * as libubus from "ubus";
import { gen_id } from "./utils.uc";
let core;
let clients = {};
const pubsub_proto = {
get_channel: function() {
let cl = clients[this.client];
if (!cl)
return;
return cl.chan;
}
};
function client_pubsub(kind, cl, names)
{
if (type(names) != "array")
return libubus.STATUS_INVALID_ARGUMENT;
for (let cur in names) {
if (type(cur) != "string")
return libubus.STATUS_INVALID_ARGUMENT;
}
if (!core.acl_check(kind, cl.acl, names))
return libubus.STATUS_PERMISSION_DENIED;
let cl_list = cl[kind];
for (let name in names) {
if (cl_list[name])
continue;
cl_list[name] = core.pubsub_add(kind, name, proto({
client: cl.id,
}, pubsub_proto));
}
return 0;
}
function prepare_data(args)
{
return {
name: args.name,
type: args.type,
data: args.data,
};
}
function client_request(cl, req)
{
let args = req.args;
let name = args.name;
if (type(name) != "string" || type(args.type) != "string" || type(args.data) != "object")
return libubus.STATUS_INVALID_ARGUMENT;
let data = prepare_data(req.args);
let handle;
switch (req.type) {
case "message":
handle = cl.publish[name];
if (!handle)
return libubus.STATUS_INVALID_ARGUMENT;
return core.handle_message(handle, data, true);
case "request":
handle = cl.subscribe[name];
if (!handle)
return libubus.STATUS_INVALID_ARGUMENT;
return core.handle_request(handle, req, data, true);
}
}
function client_cb(cl, req)
{
let args = req.args;
switch (req.type) {
case "publish":
case "subscribe":
return client_pubsub(req.type, cl, args.name);
case "message":
case "request":
return client_request(cl, req);
}
}
function client_disconnect(id)
{
let cl = clients[id];
if (!cl)
return;
for (let kind in [ "publish", "subscribe" ])
for (let name, data in cl[kind])
core.pubsub_del(kind, name, data);
delete clients[id];
}
export function new(req)
{
let id = gen_id();
let acl = req.info.acl;
let client = {
id, acl,
publish: {},
subscribe: {},
};
let cb = (req) => client_cb(client, req);
let disconnect_cb = () => client_disconnect(id);
client.chan = req.new_channel(cb, disconnect_cb);
clients[id] = client;
return client;
};
export function set_core(_core)
{
core = _core;
};

View File

@ -0,0 +1,530 @@
// SPDX-License-Identifier: GPL-2.0+
/*
* Copyright (C) 2025 Felix Fietkau <nbd@nbd.name>
*/
'use strict';
import * as libubus from "ubus";
import * as uloop from "uloop";
import * as socket from "socket";
import { gen_id, is_equal } from "./utils.uc";
let core, ubus;
let local_id = gen_id();
let ev_listener, sub;
let networks = {};
const USYNC_PORT = 51818;
const pubsub_proto = {
get_channel: function() {
let net = networks[this.network];
if (!net)
return;
let sock_data = net.tx_channels[this.name];
if (!sock_data)
return;
return sock_data.channel;
},
get_response_data: function(data) {
data.network = this.network,
data.host = this.name;
return data;
}
};
function network_socket_close(data)
{
if (!data)
return;
if (data.timer)
data.timer.cancel();
data.channel.disconnect();
data.socket.close();
}
function network_rx_socket_close(data)
{
if (!data)
return;
core.dbg(`Incoming connection from ${data.name} closed\n`);
let net = networks[data.network];
if (net && net.rx_channels[data.name] == data)
delete net.rx_channels[data.name];
for (let name, sub in core.remote_subscribe)
delete sub[data.name];
for (let name, sub in core.remote_publish)
delete sub[data.name];
network_socket_close(data);
}
function network_tx_socket_close(data)
{
if (!data)
return;
core.dbg(`Outgoing connection to ${data.name} closed\n`);
let net = networks[data.network];
if (net && net.tx_channels[data.name] == data)
delete net.tx_channels[data.name];
network_socket_close(data);
}
function network_socket_handle_request(sock_data, req)
{
let net = networks[sock_data.network];
if (!net)
return;
let msgtype = req.type;
let host = sock_data.name;
let network = sock_data.network;
let args = { ...req.args, host, network };
switch (msgtype) {
case "publish":
case "subscribe":
let list = sock_data[msgtype];
let name = args.name;
if (!name)
return;
if (args.enabled) {
if (list[name])
return 0;
let allowed;
for (let cur in net.peers[host].allowed) {
if (!wildcard(name, cur))
continue;
allowed = true;
break;
}
if (!allowed)
return 0;
core["remote_" + msgtype][name] ??= {};
core["remote_" + msgtype][name][host] = proto({
network: sock_data.network,
name: host,
}, pubsub_proto);
list[name] = true;
} else {
if (!list[name])
return 0;
delete core["remote_" + msgtype][name][host];
delete list[name];
}
break;
case "request":
return core.handle_request(null, req, args);
case "message":
core.handle_message(null, args);
return 0;
}
return 0;
}
function network_auth_token(net, host, id)
{
let auth_data = ubus.call("unetd", "token_create", {
network: net,
target: host,
data: { id }
});
if (!auth_data)
return;
return auth_data.token;
}
function network_auth_valid(host, id, token)
{
if (!token)
return;
let data = ubus.call("unetd", "token_parse", { token });
if (!data)
return;
if (data.host != host)
return;
if (data.user != "root")
return;
data = data.data;
if (data.id != id)
return;
return true;
}
function network_check_auth(sock_data, info)
{
if (!network_auth_valid(sock_data.name, sock_data.id, info.token))
return;
let net = networks[sock_data.network];
if (!net)
return;
if (!net.peers[sock_data.name])
return;
network_rx_socket_close(net.rx_channels[sock_data.name]);
if (sock_data.timer)
sock_data.timer.cancel();
sock_data.auth = true;
net.rx_channels[sock_data.name] = sock_data;
core.dbg(`Incoming connection from ${sock_data.name} established\n`);
if (!net.tx_channels[sock_data.name])
net.timer.set(100);
}
function network_accept(net, sock, addr)
{
let src = addr.address;
let name;
for (let cur_name, data in net.peers)
if (data.address == src)
name = cur_name;
if (!name) {
core.dbg(`No peer found for address ${src}\n`);
sock.close();
return;
}
let sock_data = {
network: net.name,
socket: sock,
publish: {},
subscribe: {},
name,
};
let cb = (req) => {
if (!sock_data.auth) {
if (req.type == "hello") {
sock_data.id = req.args.id;
return;
}
if (req.type == "auth")
network_check_auth(sock_data, req.args);
if (!sock_data.auth) {
warn(`Auth failed\n`);
network_rx_socket_close(sock_data);
return 0;
}
let token = network_auth_token(net.name, name, req.args.id);
if (!token) {
warn(`Failed to generate auth reply token\n`);
return 0;
}
req.reply({ token }, -1);
return 0;
}
return network_socket_handle_request(sock_data, req);
};
let disconnect_cb = (req) => {
network_rx_socket_close(sock_data);
};
sock_data.id = gen_id();
sock_data.timer = uloop.timer(10 * 1000, () => {
network_socket_close(sock_data);
});
sock_data.channel = libubus.open_channel(sock, cb, disconnect_cb);
sock_data.channel.request({
method: "hello",
data: { id: sock_data.id },
return: "ignore",
});
}
function network_open_channel(net, name, peer)
{
network_tx_socket_close(net.tx_channels[name]);
let sock_data = {
network: net.name,
name
};
let addr = socket.sockaddr({
address: peer.address,
port: USYNC_PORT
});
if (!addr)
return;
let sock = socket.create(socket.AF_INET6, socket.SOCK_STREAM | socket.SOCK_NONBLOCK);
if (!sock)
return;
core.dbg(`Try to connect to ${name}\n`);
sock.connect(addr);
let auth_data_cb = (msg) => {
if (!network_auth_valid(sock_data.name, sock_data.id, msg.token))
return;
sock_data.auth = true;
core.dbg(`Outgoing connection to ${name} established\n`);
for (let kind in [ "publish", "subscribe" ])
for (let name in core[kind])
sock_data.channel.request({
method: kind,
data: { name, enabled: true },
return: "ignore",
});
};
let auth_cb = () => {
if (!sock_data.auth)
network_tx_socket_close(sock_data);
};
let cb = (req) => {
if (sock_data.auth)
return 0;
if (req.type != "hello") {
network_tx_socket_close(sock_data);
return 0;
}
let token = network_auth_token(net.name, name, req.args.id);
if (!token) {
network_tx_socket_close(sock_data);
return 0;
}
sock_data.request = sock_data.channel.defer({
method: "auth",
data: { token },
data_cb: auth_data_cb,
cb: auth_cb,
});
return 0;
};
let disconnect_cb = (req) => {
let net = networks[sock_data.network];
let cur_data = net.tx_channels[sock_data.name];
if (cur_data == sock_data)
delete net.rx_channels[sock_data.name];
network_tx_socket_close(sock_data);
};
sock_data.socket = sock;
sock_data.channel = libubus.open_channel(sock, cb, disconnect_cb);
net.tx_channels[name] = sock_data;
}
function network_connect_peers(net)
{
let n_pending = 0;
for (let name, data in net.peers) {
let chan = net.tx_channels[name];
if (chan && chan.auth)
continue;
network_open_channel(net, name, data);
n_pending++;
}
for (let name, sock_data in net.tx_channels)
if (!net.peers[name])
network_tx_socket_close(sock_data);
for (let name, sock_data in net.rx_channels)
if (!net.peers[name])
network_rx_socket_close(sock_data);
if (n_pending)
net.timer.set(10 * 1000);
}
function network_open(name, info)
{
let net = info;
net.socket = socket.listen(net.local_address, USYNC_PORT, {
family: socket.AF_INET6,
socktype: socket.SOCK_STREAM,
flags: socket.AI_NUMERICHOST,
}, null, true);
if (!net.socket) {
warn(`Failed to open socket: ${socket.error()}\n`);
return;
}
net.name = name;
net.rx_channels = {};
net.tx_channels = {};
net.socket.setopt(socket.SOL_TCP, socket.TCP_USER_TIMEOUT, 30 * 1000);
let cb = () => {
let addr = {};
let sock = net.socket.accept(addr);
if (sock)
network_accept(net, sock, addr);
};
net.handle = uloop.handle(net.socket.fileno(), cb, uloop.ULOOP_READ);
net.timer = uloop.timer(100, () => network_connect_peers(net));
networks[name] = net;
}
function network_close(name)
{
let net = networks[name];
net.timer.cancel();
net.handle.delete();
net.socket.close();
delete networks[name];
}
function network_update(name, info)
{
let net = networks[name];
if (!net)
return;
if (net.local_host != info.local_host ||
net.local_address != info.local_address) {
network_close(name);
network_open(name, info);
return;
}
for (let name, peer in net.peers) {
let allowed;
if (info.peers[name])
allowed = info.peers[name].allowed;
if (is_equal(peer.allowed, allowed))
continue;
network_rx_socket_close(net.rx_channels[name]);
network_tx_socket_close(net.tx_channels[name]);
}
net.peers = info.peers;
net.timer.set(100);
}
function unetd_network_check_peers(info)
{
let services = [];
for (let name, data in info.services) {
if (data.type == "unetmsg")
push(services, data);
}
if (!length(services))
return;
services = filter(services, (v) => index(v.members, info.local_host) >= 0);
for (let name in keys(info.peers)) {
let allowed = [];
for (let data in services) {
if (index(data.members, name) < 0)
continue;
let cur_allowed = [ "*" ];
if (data.config && data.config.allowed)
cur_allowed = data.config.allowed;
for (let cur in cur_allowed)
if (index(allowed, cur) < 0)
push(allowed, cur);
}
if (!length(allowed))
delete info.peers[name];
else
info.peers[name].allowed = allowed;
}
}
function unetd_network_update()
{
let data = ubus.call("unetd", "network_get");
if (!data || !data.networks)
return;
for (let name, info in data.networks) {
if (!info.local_host)
continue;
unetd_network_check_peers(info);
if (networks[name])
network_update(name, info);
else
network_open(name, info);
}
for (let name in networks)
if (!data.networks)
network_close(name);
}
function unetd_cb(msg)
{
if (msg.type == "network_update")
unetd_network_update();
return 0;
}
export function pubsub_set(kind, name, enabled)
{
for (let net_name, net in networks) {
for (let host_name, chan in net.tx_channels) {
if (!chan.auth)
continue;
chan.channel.request({
method: kind,
data: { name, enabled },
return: "ignore",
});
}
}
};
export function init(_core)
{
core = _core;
ubus = core.ubus;
sub = ubus.subscriber(unetd_cb);
unetd_network_update();
ev_listener = ubus.listener("ubus.object.add", (event, msg) => {
if (msg.path == "unetd")
sub.subscribe(msg.path);
});
sub.subscribe("unetd");
};

View File

@ -0,0 +1,228 @@
// SPDX-License-Identifier: GPL-2.0+
/*
* Copyright (C) 2025 Felix Fietkau <nbd@nbd.name>
*/
'use strict';
import * as client from "./unetmsgd-client.uc";
import * as remote from "./unetmsgd-remote.uc";
import { gen_id } from "./utils.uc";
function __acl_check(list, name)
{
for (let cur in list)
if (wildcard(name, cur, true))
return true;
}
function acl_check(acl_type, info, names)
{
let acl = this.acl;
if (info.user == "root")
return true;
let list = acl[acl_type][info.user] ?? [];
if (info.group) {
let list2 = acl[acl_type][":" + info.group];
if (list2)
list = [ ...list, ...list2 ];
}
for (let name in names)
if (!__acl_check(list, name))
return;
return true;
}
function new_handle(list, name, data)
{
let id = gen_id();
data._id = id;
list[name] ??= {};
list[name][id] = data;
return data;
}
function pubsub_add(kind, name, data)
{
let list = this[kind];
if (!length(list[name])) {
list[name] = {};
remote.pubsub_set(kind, name, true);
}
return new_handle(this[kind], name, data);
}
function pubsub_del(kind, name, data)
{
let list = this[kind][name];
delete list[data._id];
if (!length(list))
remote.pubsub_set(kind, name, false);
}
function get_handles(handle, local, remote)
{
let handles = [];
for (let cur_id, cur in local) {
if (handle) {
if (handle.id == cur_id)
continue;
if (handle.client && handle.client == cur.client)
continue;
}
push(handles, cur);
}
if (!remote)
return handles;
for (let cur_id, cur in remote)
push(handles, cur);
return handles;
}
function handle_request(handle, req, data, remote)
{
let name = data.name;
let local = this.publish[name];
if (remote)
remote = this.remote_publish[name];
let handles = get_handles(handle, local, remote);
let context = {
pending: length(handles),
req
};
if (!context.pending)
return 0;
req.defer();
let cb = (ret) => {
if (--context.pending > 0)
return;
req.reply();
};
for (let cur in handles) {
if (!cur || !cur.get_channel) {
continue;
}
let chan = cur.get_channel();
if (!chan) {
cb();
continue;
}
let cur_handle = cur;
let data_cb = (msg) => {
if (cur_handle.get_response_data)
msg = cur.get_response_data(msg);
req.reply(msg, -1);
};
chan.defer({
method: "request",
data, cb, data_cb
});
}
}
function handle_message(handle, data, remote)
{
let name = data.name;
let local = this.subscribe[name];
if (remote)
remote = this.remote_subscribe[name];
let handles = get_handles(handle, local, remote);
for (let cur in handles) {
if (!cur || !cur.get_channel)
continue;
let chan = cur.get_channel();
if (!chan)
continue;
chan.request({
method: "message",
return: "ignore",
data,
});
}
return 0;
}
function add_acl(type, user, data)
{
if (!data || !user)
return;
type[user] ??= [];
let list = type[user];
for (let cur in data)
if (index(list, data) < 0)
push(list, cur);
}
function acl_set(acl_data)
{
let acl = this.acl = {
publish: {},
subscribe: {},
};
for (let cur in acl_data) {
if (cur.obj != "unetmsg" || !cur.acl)
continue;
if (cur.group)
cur.group = ":" + cur.group;
for (let user in [ cur.user, cur.group ]) {
add_acl(acl.publish, user, cur.acl.publish);
add_acl(acl.subscribe, user, cur.acl.publish);
add_acl(acl.subscribe, user, cur.acl.subscribe);
}
}
};
const core_proto = {
acl_check,
acl_set,
pubsub_add,
pubsub_del,
handle_request,
handle_message,
dbg: function(msg) {
if (this.debug_enabled)
warn(msg);
},
exception: function(e) {
this.dbg(`Exception: ${e}\n${e.stacktrace[0].context}`);
}
};
export function init(ubus, debug_enabled)
{
let data = proto({
clients: {},
publish: {},
subscribe: {},
remote_publish: {},
remote_subscribe: {},
client,
remote,
ubus,
debug_enabled
}, core_proto);
client.set_core(data);
remote.init(data);
return data;
};

View File

@ -0,0 +1,40 @@
// SPDX-License-Identifier: GPL-2.0+
/*
* Copyright (C) 2025 Felix Fietkau <nbd@nbd.name>
*/
'use strict';
import { open } from "fs";
export function is_equal(val1, val2) {
let t1 = type(val1);
if (t1 != type(val2))
return false;
if (t1 == "array") {
if (length(val1) != length(val2))
return false;
for (let i = 0; i < length(val1); i++)
if (!is_equal(val1[i], val2[i]))
return false;
return true;
} else if (t1 == "object") {
for (let key in val1)
if (!is_equal(val1[key], val2[key]))
return false;
for (let key in val2)
if (val1[key] == null)
return false;
return true;
} else {
return val1 == val2;
}
};
export function gen_id()
{
let id = open("/dev/urandom").read(12);
return join("", map(split(id, ""), (v) => sprintf("%02x", ord(v))));
};