rabbitMQ implementation

This commit is contained in:
Grant Limberg 2019-03-05 15:11:50 -08:00
parent 1f13374a4f
commit fcb4221f97
9 changed files with 249 additions and 17 deletions

View File

@ -464,12 +464,13 @@ static bool _parseRule(json &r,ZT_VirtualNetworkRule &rule)
} // anonymous namespace
EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort) :
EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort, MQConfig *mqc) :
_startTime(OSUtils::now()),
_listenPort(listenPort),
_node(node),
_path(dbPath),
_sender((NetworkController::Sender *)0)
_sender((NetworkController::Sender *)0),
_mqc(mqc)
{
}
@ -489,7 +490,7 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender)
_signingIdAddressString = signingId.address().toString(tmp);
#ifdef ZT_CONTROLLER_USE_LIBPQ
if ((_path.length() > 9)&&(_path.substr(0,9) == "postgres:"))
_db.reset(new PostgreSQL(this,_signingId,_path.substr(9).c_str(), _listenPort));
_db.reset(new PostgreSQL(this,_signingId,_path.substr(9).c_str(), _listenPort, _mqc));
else // else use FileDB after endif
#endif
_db.reset(new FileDB(this,_signingId,_path.c_str()));

View File

@ -60,6 +60,8 @@ namespace ZeroTier {
class Node;
struct MQConfig;
class EmbeddedNetworkController : public NetworkController
{
public:
@ -67,7 +69,7 @@ public:
* @param node Parent node
* @param dbPath Database path (file path or database credentials)
*/
EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort);
EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort, MQConfig *mqc = NULL);
virtual ~EmbeddedNetworkController();
virtual void init(const Identity &signingId,Sender *sender);
@ -164,6 +166,8 @@ private:
std::unordered_map< _MemberStatusKey,_MemberStatus,_MemberStatusHash > _memberStatus;
std::mutex _memberStatus_l;
MQConfig *_mqc;
};
} // namespace ZeroTier

View File

@ -28,6 +28,7 @@
#include "PostgreSQL.hpp"
#include "EmbeddedNetworkController.hpp"
#include "RabbitMQ.hpp"
#include "../version.h"
#include <libpq-fe.h>
@ -74,7 +75,7 @@ std::string join(const std::vector<std::string> &elements, const char * const se
using namespace ZeroTier;
PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, mq_config *mqc)
PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc)
: DB(nc, myId, path)
, _ready(0)
, _connected(1)
@ -646,7 +647,32 @@ void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) {
}
void PostgreSQL::_membersWatcher_RabbitMQ() {
char buf[11] = {0};
std::string qname = "member_"+ std::string(_myAddress.toString(buf));
RabbitMQ rmq(_mqc, qname.c_str());
try {
rmq.init();
} catch (std::runtime_error &e) {
fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
exit(11);
}
while (_run == 1) {
try {
std::string msg = rmq.consume();
json tmp(json::parse(msg));
json &ov = tmp["old_val"];
json &nv = tmp["new_val"];
json oldConfig, newConfig;
if (ov.is_object()) oldConfig = ov;
if (nv.is_object()) newConfig = nv;
if (oldConfig.is_object() || newConfig.is_object()) {
_memberChanged(oldConfig,newConfig,(this->_ready>=2));
}
} catch (std::runtime_error &e) {
fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
break;
} catch(...) {}
}
}
void PostgreSQL::networksDbWatcher()
@ -714,7 +740,32 @@ void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) {
}
void PostgreSQL::_networksWatcher_RabbitMQ() {
char buf[11] = {0};
std::string qname = "network_"+ std::string(_myAddress.toString(buf));
RabbitMQ rmq(_mqc, qname.c_str());
try {
rmq.init();
} catch (std::runtime_error &e) {
fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
exit(11);
}
while (_run == 1) {
try {
std::string msg = rmq.consume();
json tmp(json::parse(msg));
json &ov = tmp["old_val"];
json &nv = tmp["new_val"];
json oldConfig, newConfig;
if (ov.is_object()) oldConfig = ov;
if (nv.is_object()) newConfig = nv;
if (oldConfig.is_object()||newConfig.is_object()) {
_networkChanged(oldConfig,newConfig,(this->_ready >= 2));
}
} catch (std::runtime_error &e) {
fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
break;
} catch(...) {}
}
}
void PostgreSQL::commitThread()

View File

@ -40,13 +40,7 @@ extern "C" {
namespace ZeroTier
{
struct mq_config
{
const char *host;
int port;
const char *username;
const char *password;
};
struct MQConfig;
/**
* A controller database driver that talks to PostgreSQL
@ -57,7 +51,7 @@ struct mq_config
class PostgreSQL : public DB
{
public:
PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, mq_config *mqc = NULL);
PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL);
virtual ~PostgreSQL();
virtual bool waitForReady();
@ -115,7 +109,7 @@ private:
int _listenPort;
mq_config *_mqc;
MQConfig *_mqc;
};
}

91
controller/RabbitMQ.cpp Normal file
View File

@ -0,0 +1,91 @@
#include "RabbitMQ.hpp"
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <stdexcept>
#include <cstring>
namespace ZeroTier
{
RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName)
: _mqc(cfg)
, _qName(queueName)
, _socket(NULL)
, _status(0)
{
}
RabbitMQ::~RabbitMQ()
{
amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS);
amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(_conn);
}
void RabbitMQ::init()
{
struct timeval tval;
memset(&tval, 0, sizeof(struct timeval));
tval.tv_sec = 5;
fprintf(stderr, "Initializing RabbitMQ %s\n", _qName);
_conn = amqp_new_connection();
_socket = amqp_tcp_socket_new(_conn);
if (!_socket) {
throw std::runtime_error("Can't create socket for RabbitMQ");
}
_status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval);
if (_status) {
throw std::runtime_error("Can't connect to RabbitMQ");
}
amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
_mqc->username, _mqc->password);
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("RabbitMQ Login Error");
}
static int chan = 0;
amqp_channel_open(_conn, ++chan);
r = amqp_get_rpc_reply(_conn);
if(r.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("Error opening communication channel");
}
_channel = chan;
_q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);
r = amqp_get_rpc_reply(_conn);
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("Error declaring queue");
}
amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
r = amqp_get_rpc_reply(_conn);
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("Error conuming");
}
fprintf(stderr, "RabbitMQ Init OK %s\n", _qName);
}
std::string RabbitMQ::consume()
{
amqp_rpc_reply_t res;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(_conn);
res = amqp_consume_message(_conn, &envelope, NULL, 0);
if (res.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("Error getting message");
}
std::string msg(
(const char*)envelope.message.body.bytes,
envelope.message.body.len
);
amqp_destroy_envelope(&envelope);
return msg;
}
}

75
controller/RabbitMQ.hpp Normal file
View File

@ -0,0 +1,75 @@
/*
* ZeroTier One - Network Virtualization Everywhere
* Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* --
*
* You can be released from the requirements of the license by purchasing
* a commercial license. Buying such a license is mandatory as soon as you
* develop commercial closed-source software that incorporates or links
* directly against ZeroTier software without disclosing the source code
* of your own application.
*/
#ifndef ZT_CONTROLLER_RABBITMQ_HPP
#define ZT_CONTROLLER_RABBITMQ_HPP
namespace ZeroTier
{
struct MQConfig {
const char *host;
int port;
const char *username;
const char *password;
};
}
#ifdef ZT_CONTROLLER_USE_LIBPQ
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <string>
namespace ZeroTier
{
class RabbitMQ {
public:
RabbitMQ(MQConfig *cfg, const char *queueName);
~RabbitMQ();
void init();
std::string consume();
private:
MQConfig *_mqc;
const char *_qName;
amqp_socket_t *_socket;
amqp_connection_state_t _conn;
amqp_queue_declare_ok_t *_q;
int _status;
int _channel;
};
}
#endif // ZT_CONTROLLER_USE_LIBPQ
#endif // ZT_CONTROLLER_RABBITMQ_HPP

View File

@ -31,6 +31,7 @@ ONE_OBJS=\
controller/DB.o \
controller/FileDB.o \
controller/PostgreSQL.o \
controller/RabbitMQ.o \
osdep/ManagedRoute.o \
osdep/Http.o \
osdep/OSUtils.o \

View File

@ -99,6 +99,7 @@ extern "C" {
using json = nlohmann::json;
#include "../controller/EmbeddedNetworkController.hpp"
#include "../controller/RabbitMQ.hpp"
#ifdef ZT_USE_TEST_TAP
@ -569,6 +570,8 @@ public:
volatile bool _run;
Mutex _run_m;
MQConfig *_mqc;
// end member variables ----------------------------------------------------
OneServiceImpl(const char *hp,unsigned int port) :
@ -604,6 +607,7 @@ public:
,_vaultPath("cubbyhole/zerotier")
#endif
,_run(true)
,_mqc(NULL)
{
_ports[0] = 0;
_ports[1] = 0;
@ -678,6 +682,7 @@ public:
delete _portMapper;
#endif
delete _controller;
delete _mqc;
}
virtual ReasonForTermination run()
@ -809,7 +814,7 @@ public:
OSUtils::rmDashRf((_homePath + ZT_PATH_SEPARATOR_S "iddb.d").c_str());
// Network controller is now enabled by default for desktop and server
_controller = new EmbeddedNetworkController(_node,_controllerDbPath.c_str(),_ports[0]);
_controller = new EmbeddedNetworkController(_node,_controllerDbPath.c_str(),_ports[0], _mqc);
_node->setNetconfMaster((void *)_controller);
// Join existing networks in networks.d
@ -1073,6 +1078,16 @@ public:
if (cdbp.length() > 0)
_controllerDbPath = cdbp;
json &rmq = settings["rabbitmq"];
if (rmq.is_object() && _mqc == NULL) {
fprintf(stderr, "Reading RabbitMQ Config\n");
_mqc = new MQConfig;
_mqc->port = rmq["port"];
_mqc->host = OSUtils::jsonString(rmq["host"], "").c_str();
_mqc->username = OSUtils::jsonString(rmq["username"], "").c_str();
_mqc->password = OSUtils::jsonString(rmq["password"], "").c_str();
}
// Bind to wildcard instead of to specific interfaces (disables full tunnel capability)
json &bind = settings["bind"];
if (bind.is_array()) {