diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp index 8b3f1517f..5ddcaa4b4 100644 --- a/controller/EmbeddedNetworkController.cpp +++ b/controller/EmbeddedNetworkController.cpp @@ -464,12 +464,13 @@ static bool _parseRule(json &r,ZT_VirtualNetworkRule &rule) } // anonymous namespace -EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort) : +EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort, MQConfig *mqc) : _startTime(OSUtils::now()), _listenPort(listenPort), _node(node), _path(dbPath), - _sender((NetworkController::Sender *)0) + _sender((NetworkController::Sender *)0), + _mqc(mqc) { } @@ -489,7 +490,7 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender) _signingIdAddressString = signingId.address().toString(tmp); #ifdef ZT_CONTROLLER_USE_LIBPQ if ((_path.length() > 9)&&(_path.substr(0,9) == "postgres:")) - _db.reset(new PostgreSQL(this,_signingId,_path.substr(9).c_str(), _listenPort)); + _db.reset(new PostgreSQL(this,_signingId,_path.substr(9).c_str(), _listenPort, _mqc)); else // else use FileDB after endif #endif _db.reset(new FileDB(this,_signingId,_path.c_str())); diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp index 718c97ffd..269442a87 100644 --- a/controller/EmbeddedNetworkController.hpp +++ b/controller/EmbeddedNetworkController.hpp @@ -60,6 +60,8 @@ namespace ZeroTier { class Node; +struct MQConfig; + class EmbeddedNetworkController : public NetworkController { public: @@ -67,7 +69,7 @@ public: * @param node Parent node * @param dbPath Database path (file path or database credentials) */ - EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort); + EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort, MQConfig *mqc = NULL); virtual ~EmbeddedNetworkController(); virtual void init(const Identity &signingId,Sender *sender); @@ -164,6 +166,8 @@ private: std::unordered_map< _MemberStatusKey,_MemberStatus,_MemberStatusHash > _memberStatus; std::mutex _memberStatus_l; + + MQConfig *_mqc; }; } // namespace ZeroTier diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 8dc56c9cc..5192dad5d 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -28,6 +28,7 @@ #include "PostgreSQL.hpp" #include "EmbeddedNetworkController.hpp" +#include "RabbitMQ.hpp" #include "../version.h" #include @@ -74,7 +75,7 @@ std::string join(const std::vector &elements, const char * const se using namespace ZeroTier; -PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, mq_config *mqc) +PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc) : DB(nc, myId, path) , _ready(0) , _connected(1) @@ -646,7 +647,32 @@ void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) { } void PostgreSQL::_membersWatcher_RabbitMQ() { - + char buf[11] = {0}; + std::string qname = "member_"+ std::string(_myAddress.toString(buf)); + RabbitMQ rmq(_mqc, qname.c_str()); + try { + rmq.init(); + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + exit(11); + } + while (_run == 1) { + try { + std::string msg = rmq.consume(); + json tmp(json::parse(msg)); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object() || newConfig.is_object()) { + _memberChanged(oldConfig,newConfig,(this->_ready>=2)); + } + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + break; + } catch(...) {} + } } void PostgreSQL::networksDbWatcher() @@ -714,7 +740,32 @@ void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) { } void PostgreSQL::_networksWatcher_RabbitMQ() { - + char buf[11] = {0}; + std::string qname = "network_"+ std::string(_myAddress.toString(buf)); + RabbitMQ rmq(_mqc, qname.c_str()); + try { + rmq.init(); + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + exit(11); + } + while (_run == 1) { + try { + std::string msg = rmq.consume(); + json tmp(json::parse(msg)); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object()||newConfig.is_object()) { + _networkChanged(oldConfig,newConfig,(this->_ready >= 2)); + } + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + break; + } catch(...) {} + } } void PostgreSQL::commitThread() diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp index e0dcdf06f..779d47bd5 100644 --- a/controller/PostgreSQL.hpp +++ b/controller/PostgreSQL.hpp @@ -40,13 +40,7 @@ extern "C" { namespace ZeroTier { -struct mq_config -{ - const char *host; - int port; - const char *username; - const char *password; -}; +struct MQConfig; /** * A controller database driver that talks to PostgreSQL @@ -57,7 +51,7 @@ struct mq_config class PostgreSQL : public DB { public: - PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, mq_config *mqc = NULL); + PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL); virtual ~PostgreSQL(); virtual bool waitForReady(); @@ -115,7 +109,7 @@ private: int _listenPort; - mq_config *_mqc; + MQConfig *_mqc; }; } diff --git a/controller/RabbitMQ.cpp b/controller/RabbitMQ.cpp new file mode 100644 index 000000000..34efcde40 --- /dev/null +++ b/controller/RabbitMQ.cpp @@ -0,0 +1,91 @@ +#include "RabbitMQ.hpp" + +#include +#include +#include +#include + +namespace ZeroTier +{ + +RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName) + : _mqc(cfg) + , _qName(queueName) + , _socket(NULL) + , _status(0) +{ +} + +RabbitMQ::~RabbitMQ() +{ + amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS); + amqp_connection_close(_conn, AMQP_REPLY_SUCCESS); + amqp_destroy_connection(_conn); +} + +void RabbitMQ::init() +{ + struct timeval tval; + memset(&tval, 0, sizeof(struct timeval)); + tval.tv_sec = 5; + + fprintf(stderr, "Initializing RabbitMQ %s\n", _qName); + _conn = amqp_new_connection(); + _socket = amqp_tcp_socket_new(_conn); + if (!_socket) { + throw std::runtime_error("Can't create socket for RabbitMQ"); + } + + _status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval); + if (_status) { + throw std::runtime_error("Can't connect to RabbitMQ"); + } + + amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, + _mqc->username, _mqc->password); + if (r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("RabbitMQ Login Error"); + } + + static int chan = 0; + amqp_channel_open(_conn, ++chan); + r = amqp_get_rpc_reply(_conn); + if(r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error opening communication channel"); + } + _channel = chan; + + _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table); + r = amqp_get_rpc_reply(_conn); + if (r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error declaring queue"); + } + + amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table); + r = amqp_get_rpc_reply(_conn); + if (r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error conuming"); + } + fprintf(stderr, "RabbitMQ Init OK %s\n", _qName); +} + +std::string RabbitMQ::consume() +{ + amqp_rpc_reply_t res; + amqp_envelope_t envelope; + amqp_maybe_release_buffers(_conn); + + res = amqp_consume_message(_conn, &envelope, NULL, 0); + if (res.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error getting message"); + } + + std::string msg( + (const char*)envelope.message.body.bytes, + envelope.message.body.len + ); + amqp_destroy_envelope(&envelope); + return msg; +} + +} \ No newline at end of file diff --git a/controller/RabbitMQ.hpp b/controller/RabbitMQ.hpp new file mode 100644 index 000000000..74023b124 --- /dev/null +++ b/controller/RabbitMQ.hpp @@ -0,0 +1,75 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * -- + * + * You can be released from the requirements of the license by purchasing + * a commercial license. Buying such a license is mandatory as soon as you + * develop commercial closed-source software that incorporates or links + * directly against ZeroTier software without disclosing the source code + * of your own application. + */ +#ifndef ZT_CONTROLLER_RABBITMQ_HPP +#define ZT_CONTROLLER_RABBITMQ_HPP + +namespace ZeroTier +{ +struct MQConfig { + const char *host; + int port; + const char *username; + const char *password; +}; +} + + +#ifdef ZT_CONTROLLER_USE_LIBPQ + +#include +#include +#include + +namespace ZeroTier +{ + +class RabbitMQ { +public: + RabbitMQ(MQConfig *cfg, const char *queueName); + ~RabbitMQ(); + + void init(); + + std::string consume(); + +private: + MQConfig *_mqc; + const char *_qName; + + amqp_socket_t *_socket; + amqp_connection_state_t _conn; + amqp_queue_declare_ok_t *_q; + int _status; + + int _channel; +}; + +} + +#endif // ZT_CONTROLLER_USE_LIBPQ + +#endif // ZT_CONTROLLER_RABBITMQ_HPP + diff --git a/ext/librabbitmq/centos_x64/lib/librabbitmq.a b/ext/librabbitmq/centos_x64/lib/librabbitmq.a index 8fc39d06b..9495fa7b8 100644 Binary files a/ext/librabbitmq/centos_x64/lib/librabbitmq.a and b/ext/librabbitmq/centos_x64/lib/librabbitmq.a differ diff --git a/objects.mk b/objects.mk index c15fe299d..eb348dca2 100644 --- a/objects.mk +++ b/objects.mk @@ -31,6 +31,7 @@ ONE_OBJS=\ controller/DB.o \ controller/FileDB.o \ controller/PostgreSQL.o \ + controller/RabbitMQ.o \ osdep/ManagedRoute.o \ osdep/Http.o \ osdep/OSUtils.o \ diff --git a/service/OneService.cpp b/service/OneService.cpp index b66731982..0fd31c640 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -99,6 +99,7 @@ extern "C" { using json = nlohmann::json; #include "../controller/EmbeddedNetworkController.hpp" +#include "../controller/RabbitMQ.hpp" #ifdef ZT_USE_TEST_TAP @@ -569,6 +570,8 @@ public: volatile bool _run; Mutex _run_m; + MQConfig *_mqc; + // end member variables ---------------------------------------------------- OneServiceImpl(const char *hp,unsigned int port) : @@ -604,6 +607,7 @@ public: ,_vaultPath("cubbyhole/zerotier") #endif ,_run(true) + ,_mqc(NULL) { _ports[0] = 0; _ports[1] = 0; @@ -678,6 +682,7 @@ public: delete _portMapper; #endif delete _controller; + delete _mqc; } virtual ReasonForTermination run() @@ -809,7 +814,7 @@ public: OSUtils::rmDashRf((_homePath + ZT_PATH_SEPARATOR_S "iddb.d").c_str()); // Network controller is now enabled by default for desktop and server - _controller = new EmbeddedNetworkController(_node,_controllerDbPath.c_str(),_ports[0]); + _controller = new EmbeddedNetworkController(_node,_controllerDbPath.c_str(),_ports[0], _mqc); _node->setNetconfMaster((void *)_controller); // Join existing networks in networks.d @@ -1073,6 +1078,16 @@ public: if (cdbp.length() > 0) _controllerDbPath = cdbp; + json &rmq = settings["rabbitmq"]; + if (rmq.is_object() && _mqc == NULL) { + fprintf(stderr, "Reading RabbitMQ Config\n"); + _mqc = new MQConfig; + _mqc->port = rmq["port"]; + _mqc->host = OSUtils::jsonString(rmq["host"], "").c_str(); + _mqc->username = OSUtils::jsonString(rmq["username"], "").c_str(); + _mqc->password = OSUtils::jsonString(rmq["password"], "").c_str(); + } + // Bind to wildcard instead of to specific interfaces (disables full tunnel capability) json &bind = settings["bind"]; if (bind.is_array()) {