From 37d508ab969afaf16c2aee1838a225022de34177 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 6 Aug 2019 07:51:50 -0500 Subject: [PATCH] Refactoring in prep for mirroring --- controller/DB.cpp | 6 +- controller/DB.hpp | 11 +- controller/EmbeddedNetworkController.cpp | 6 +- controller/EmbeddedNetworkController.hpp | 6 +- controller/FileDB.cpp | 2 +- controller/LFDB.cpp | 2 +- controller/PostgreSQL.cpp | 29 +++-- controller/PostgreSQL.hpp | 85 +++++++------- controller/RabbitMQ.cpp | 134 +++++++++++------------ controller/RabbitMQ.hpp | 32 +++--- 10 files changed, 161 insertions(+), 152 deletions(-) diff --git a/controller/DB.cpp b/controller/DB.cpp index bb734dc85..75adf53e5 100644 --- a/controller/DB.cpp +++ b/controller/DB.cpp @@ -313,7 +313,7 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool in if (initialized) { std::lock_guard ll(_changeListeners_l); for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) { - (*i)->onNetworkMemberUpdate(networkId,memberId,memberConfig); + (*i)->onNetworkMemberUpdate(this,networkId,memberId,memberConfig); } } } else if (memberId) { @@ -336,7 +336,7 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool in if ((initialized)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))) { std::lock_guard ll(_changeListeners_l); for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) { - (*i)->onNetworkMemberDeauthorize(networkId,memberId); + (*i)->onNetworkMemberDeauthorize(this,networkId,memberId); } } } @@ -362,7 +362,7 @@ void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool if (initialized) { std::lock_guard ll(_changeListeners_l); for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) { - (*i)->onNetworkUpdate(networkId,networkConfig); + (*i)->onNetworkUpdate(this,networkId,networkConfig); } } } diff --git a/controller/DB.hpp b/controller/DB.hpp index 85920eecd..461f385e6 100644 --- a/controller/DB.hpp +++ b/controller/DB.hpp @@ -58,10 +58,10 @@ public: public: ChangeListener() {} virtual ~ChangeListener() {} - virtual void onNetworkUpdate(uint64_t networkId,const nlohmann::json &network) {} - virtual void onNetworkMemberUpdate(uint64_t networkId,uint64_t memberId,const nlohmann::json &member) {} - virtual void onNetworkMemberDeauthorize(uint64_t networkId,uint64_t memberId) {} - virtual void onNetworkMemberOnline(uint64_t networkId,uint64_t memberId,const InetAddress &physicalAddress) {} + virtual void onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network) {} + virtual void onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member) {} + virtual void onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId) {} + virtual void onNetworkMemberOnline(const DB *db,uint64_t networkId,uint64_t memberId,const InetAddress &physicalAddress) {} }; struct NetworkSummaryInfo @@ -95,12 +95,15 @@ public: bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member); bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info); bool get(const uint64_t networkId,nlohmann::json &network,std::vector &members); + bool summary(const uint64_t networkId,NetworkSummaryInfo &info); void networks(std::vector &networks); virtual void save(nlohmann::json *orig,nlohmann::json &record) = 0; + virtual void eraseNetwork(const uint64_t networkId) = 0; virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0; + virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress) = 0; inline void addListener(DB::ChangeListener *const listener) diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp index bf5685271..80331578c 100644 --- a/controller/EmbeddedNetworkController.cpp +++ b/controller/EmbeddedNetworkController.cpp @@ -1190,7 +1190,7 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt) } } -void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId,const nlohmann::json &network) +void EmbeddedNetworkController::onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network) { // Send an update to all members of the network that are online const int64_t now = OSUtils::now(); @@ -1201,7 +1201,7 @@ void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId,const n } } -void EmbeddedNetworkController::onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId,const nlohmann::json &member) +void EmbeddedNetworkController::onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member) { // Push update to member if online try { @@ -1212,7 +1212,7 @@ void EmbeddedNetworkController::onNetworkMemberUpdate(const uint64_t networkId,c } catch ( ... ) {} } -void EmbeddedNetworkController::onNetworkMemberDeauthorize(const uint64_t networkId,const uint64_t memberId) +void EmbeddedNetworkController::onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId) { const int64_t now = OSUtils::now(); Revocation rev((uint32_t)_node->prng(),networkId,0,now,ZT_REVOCATION_FLAG_FAST_PROPAGATE,Address(memberId),Revocation::CREDENTIAL_TYPE_COM); diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp index 7bc37be21..c0f14f8b6 100644 --- a/controller/EmbeddedNetworkController.hpp +++ b/controller/EmbeddedNetworkController.hpp @@ -101,9 +101,9 @@ public: void handleRemoteTrace(const ZT_RemoteTrace &rt); - virtual void onNetworkUpdate(const uint64_t networkId,const nlohmann::json &network); - virtual void onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId,const nlohmann::json &member); - virtual void onNetworkMemberDeauthorize(const uint64_t networkId,const uint64_t memberId); + virtual void onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network); + virtual void onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member); + virtual void onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId); private: void _request(uint64_t nwid,const InetAddress &fromAddr,uint64_t requestPacketId,const Identity &identity,const Dictionary &metaData); diff --git a/controller/FileDB.cpp b/controller/FileDB.cpp index 484aefa57..acc8680e6 100644 --- a/controller/FileDB.cpp +++ b/controller/FileDB.cpp @@ -178,7 +178,7 @@ void FileDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const { std::lock_guard l2(_changeListeners_l); for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) - (*i)->onNetworkMemberOnline(networkId,memberId,physicalAddress); + (*i)->onNetworkMemberOnline(this,networkId,memberId,physicalAddress); } } diff --git a/controller/LFDB.cpp b/controller/LFDB.cpp index a7bbf81d5..9203a5a18 100644 --- a/controller/LFDB.cpp +++ b/controller/LFDB.cpp @@ -404,7 +404,7 @@ void LFDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const I { std::lock_guard l2(_changeListeners_l); for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) - (*i)->onNetworkMemberOnline(networkId,memberId,physicalAddress); + (*i)->onNetworkMemberOnline(this,networkId,memberId,physicalAddress); } } diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 709712b5f..45be3e518 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -24,9 +24,11 @@ * of your own application. */ +#include "PostgreSQL.hpp" + #ifdef ZT_CONTROLLER_USE_LIBPQ -#include "PostgreSQL.hpp" +#include "../node/Constants.hpp" #include "EmbeddedNetworkController.hpp" #include "RabbitMQ.hpp" #include "../version.h" @@ -37,6 +39,7 @@ #include using json = nlohmann::json; + namespace { static const int DB_MINIMUM_VERSION = 5; @@ -73,16 +76,16 @@ std::string join(const std::vector &elements, const char * const se } } -} +} // anonymous namespace using namespace ZeroTier; PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc) - : DB(myId, path) - , _ready(0) + : DB(myId, path) + , _ready(0) , _connected(1) - , _run(1) - , _waitNoticePrinted(false) + , _run(1) + , _waitNoticePrinted(false) , _listenPort(listenPort) , _mqc(mqc) { @@ -221,7 +224,7 @@ void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, { std::lock_guard l2(_changeListeners_l); for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) - (*i)->onNetworkMemberOnline(networkId,memberId,physicalAddress); + (*i)->onNetworkMemberOnline(this,networkId,memberId,physicalAddress); } } @@ -602,8 +605,8 @@ void PostgreSQL::heartbeat() "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, " "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, " "use_rabbitmq = EXCLUDED.use_rabbitmq", - 10, // number of parameters - NULL, // oid field. ignore + 10, // number of parameters + NULL, // oid field. ignore values, // values for substitution NULL, // lengths in bytes of each value NULL, // binary? @@ -724,7 +727,7 @@ void PostgreSQL::_membersWatcher_RabbitMQ() { fprintf(stderr, "RABBITMQ ERROR member change: %s\n", e.what()); } catch(...) { fprintf(stderr, "RABBITMQ ERROR member change: unknown error\n"); - } + } } } @@ -1324,7 +1327,7 @@ void PostgreSQL::onlineNotificationThread() int64_t lastUpdatedNetworkStatus = 0; std::unordered_map< std::pair,int64_t,_PairHasher > lastOnlineCumulative; - + while (_run == 1) { if (PQstatus(conn) != CONNECTION_OK) { fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres."); @@ -1438,7 +1441,8 @@ void PostgreSQL::onlineNotificationThread() } } -PGconn *PostgreSQL::getPgConn(OverrideMode m) { +PGconn *PostgreSQL::getPgConn(OverrideMode m) +{ if (m == ALLOW_PGBOUNCER_OVERRIDE) { char *connStr = getenv("PGBOUNCER_CONNSTR"); if (connStr != NULL) { @@ -1452,4 +1456,5 @@ PGconn *PostgreSQL::getPgConn(OverrideMode m) { return PQconnectdb(_connString.c_str()); } + #endif //ZT_CONTROLLER_USE_LIBPQ diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp index f35f89fc9..fe69635d1 100644 --- a/controller/PostgreSQL.hpp +++ b/controller/PostgreSQL.hpp @@ -24,6 +24,8 @@ * of your own application. */ +#define ZT_CONTROLLER_USE_LIBPQ + #ifdef ZT_CONTROLLER_USE_LIBPQ #ifndef ZT_CONTROLLER_LIBPQ_HPP @@ -34,11 +36,10 @@ #define ZT_CENTRAL_CONTROLLER_COMMIT_THREADS 4 extern "C" { - typedef struct pg_conn PGconn; +typedef struct pg_conn PGconn; } -namespace ZeroTier -{ +namespace ZeroTier { struct MQConfig; @@ -51,66 +52,66 @@ struct MQConfig; class PostgreSQL : public DB { public: - PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL); - virtual ~PostgreSQL(); + PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL); + virtual ~PostgreSQL(); - virtual bool waitForReady(); - virtual bool isReady(); - virtual void save(nlohmann::json *orig, nlohmann::json &record); - virtual void eraseNetwork(const uint64_t networkId); - virtual void eraseMember(const uint64_t networkId, const uint64_t memberId); - virtual void nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress); + virtual bool waitForReady(); + virtual bool isReady(); + virtual void save(nlohmann::json *orig, nlohmann::json &record); + virtual void eraseNetwork(const uint64_t networkId); + virtual void eraseMember(const uint64_t networkId, const uint64_t memberId); + virtual void nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress); protected: - struct _PairHasher + struct _PairHasher { inline std::size_t operator()(const std::pair &p) const { return (std::size_t)(p.first ^ p.second); } }; private: - void initializeNetworks(PGconn *conn); - void initializeMembers(PGconn *conn); - void heartbeat(); - void membersDbWatcher(); - void _membersWatcher_Postgres(PGconn *conn); - void _membersWatcher_RabbitMQ(); - void networksDbWatcher(); - void _networksWatcher_Postgres(PGconn *conn); - void _networksWatcher_RabbitMQ(); + void initializeNetworks(PGconn *conn); + void initializeMembers(PGconn *conn); + void heartbeat(); + void membersDbWatcher(); + void _membersWatcher_Postgres(PGconn *conn); + void _membersWatcher_RabbitMQ(); + void networksDbWatcher(); + void _networksWatcher_Postgres(PGconn *conn); + void _networksWatcher_RabbitMQ(); - void commitThread(); - void onlineNotificationThread(); + void commitThread(); + void onlineNotificationThread(); - enum OverrideMode { - ALLOW_PGBOUNCER_OVERRIDE = 0, - NO_OVERRIDE = 1 - }; + enum OverrideMode { + ALLOW_PGBOUNCER_OVERRIDE = 0, + NO_OVERRIDE = 1 + }; - PGconn * getPgConn( OverrideMode m = ALLOW_PGBOUNCER_OVERRIDE ); + PGconn * getPgConn( OverrideMode m = ALLOW_PGBOUNCER_OVERRIDE ); - std::string _connString; + std::string _connString; - BlockingQueue _commitQueue; + BlockingQueue _commitQueue; - std::thread _heartbeatThread; - std::thread _membersDbWatcher; - std::thread _networksDbWatcher; - std::thread _commitThread[ZT_CENTRAL_CONTROLLER_COMMIT_THREADS]; - std::thread _onlineNotificationThread; + std::thread _heartbeatThread; + std::thread _membersDbWatcher; + std::thread _networksDbWatcher; + std::thread _commitThread[ZT_CENTRAL_CONTROLLER_COMMIT_THREADS]; + std::thread _onlineNotificationThread; std::unordered_map< std::pair,std::pair,_PairHasher > _lastOnline; - mutable std::mutex _lastOnline_l; - mutable std::mutex _readyLock; - std::atomic _ready, _connected, _run; - mutable volatile bool _waitNoticePrinted; + mutable std::mutex _lastOnline_l; + mutable std::mutex _readyLock; + std::atomic _ready, _connected, _run; + mutable volatile bool _waitNoticePrinted; - int _listenPort; + int _listenPort; - MQConfig *_mqc; + MQConfig *_mqc; }; -} +} // namespace ZeroTier #endif // ZT_CONTROLLER_LIBPQ_HPP diff --git a/controller/RabbitMQ.cpp b/controller/RabbitMQ.cpp index eec9745dc..cf5c567d7 100644 --- a/controller/RabbitMQ.cpp +++ b/controller/RabbitMQ.cpp @@ -11,95 +11,95 @@ namespace ZeroTier { RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName) - : _mqc(cfg) - , _qName(queueName) - , _socket(NULL) - , _status(0) + : _mqc(cfg) + , _qName(queueName) + , _socket(NULL) + , _status(0) { } RabbitMQ::~RabbitMQ() { - amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS); - amqp_connection_close(_conn, AMQP_REPLY_SUCCESS); - amqp_destroy_connection(_conn); + amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS); + amqp_connection_close(_conn, AMQP_REPLY_SUCCESS); + amqp_destroy_connection(_conn); } void RabbitMQ::init() { - struct timeval tval; - memset(&tval, 0, sizeof(struct timeval)); - tval.tv_sec = 5; + struct timeval tval; + memset(&tval, 0, sizeof(struct timeval)); + tval.tv_sec = 5; - fprintf(stderr, "Initializing RabbitMQ %s\n", _qName); - _conn = amqp_new_connection(); - _socket = amqp_tcp_socket_new(_conn); - if (!_socket) { - throw std::runtime_error("Can't create socket for RabbitMQ"); - } - - _status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval); - if (_status) { - throw std::runtime_error("Can't connect to RabbitMQ"); - } - - amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, - _mqc->username, _mqc->password); - if (r.reply_type != AMQP_RESPONSE_NORMAL) { - throw std::runtime_error("RabbitMQ Login Error"); - } + fprintf(stderr, "Initializing RabbitMQ %s\n", _qName); + _conn = amqp_new_connection(); + _socket = amqp_tcp_socket_new(_conn); + if (!_socket) { + throw std::runtime_error("Can't create socket for RabbitMQ"); + } + + _status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval); + if (_status) { + throw std::runtime_error("Can't connect to RabbitMQ"); + } + + amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, + _mqc->username, _mqc->password); + if (r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("RabbitMQ Login Error"); + } - static int chan = 0; + static int chan = 0; { Mutex::Lock l(_chan_m); - _channel = ++chan; + _channel = ++chan; + } + amqp_channel_open(_conn, _channel); + r = amqp_get_rpc_reply(_conn); + if(r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error opening communication channel"); + } + + _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table); + r = amqp_get_rpc_reply(_conn); + if (r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error declaring queue " + std::string(_qName)); } - amqp_channel_open(_conn, _channel); - r = amqp_get_rpc_reply(_conn); - if(r.reply_type != AMQP_RESPONSE_NORMAL) { - throw std::runtime_error("Error opening communication channel"); - } - - _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table); - r = amqp_get_rpc_reply(_conn); - if (r.reply_type != AMQP_RESPONSE_NORMAL) { - throw std::runtime_error("Error declaring queue " + std::string(_qName)); - } - amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table); - r = amqp_get_rpc_reply(_conn); - if (r.reply_type != AMQP_RESPONSE_NORMAL) { - throw std::runtime_error("Error consuming queue " + std::string(_qName)); - } - fprintf(stderr, "RabbitMQ Init OK %s\n", _qName); + amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table); + r = amqp_get_rpc_reply(_conn); + if (r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error consuming queue " + std::string(_qName)); + } + fprintf(stderr, "RabbitMQ Init OK %s\n", _qName); } std::string RabbitMQ::consume() { - amqp_rpc_reply_t res; - amqp_envelope_t envelope; - amqp_maybe_release_buffers(_conn); + amqp_rpc_reply_t res; + amqp_envelope_t envelope; + amqp_maybe_release_buffers(_conn); - struct timeval timeout; - timeout.tv_sec = 1; - timeout.tv_usec = 0; + struct timeval timeout; + timeout.tv_sec = 1; + timeout.tv_usec = 0; - res = amqp_consume_message(_conn, &envelope, &timeout, 0); - if (res.reply_type != AMQP_RESPONSE_NORMAL) { - if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) { - // timeout waiting for message. Return empty string - return ""; - } else { - throw std::runtime_error("Error getting message"); - } - } + res = amqp_consume_message(_conn, &envelope, &timeout, 0); + if (res.reply_type != AMQP_RESPONSE_NORMAL) { + if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) { + // timeout waiting for message. Return empty string + return ""; + } else { + throw std::runtime_error("Error getting message"); + } + } - std::string msg( - (const char*)envelope.message.body.bytes, - envelope.message.body.len - ); - amqp_destroy_envelope(&envelope); - return msg; + std::string msg( + (const char*)envelope.message.body.bytes, + envelope.message.body.len + ); + amqp_destroy_envelope(&envelope); + return msg; } } diff --git a/controller/RabbitMQ.hpp b/controller/RabbitMQ.hpp index d341681be..6bac68daa 100644 --- a/controller/RabbitMQ.hpp +++ b/controller/RabbitMQ.hpp @@ -23,16 +23,17 @@ * directly against ZeroTier software without disclosing the source code * of your own application. */ + #ifndef ZT_CONTROLLER_RABBITMQ_HPP #define ZT_CONTROLLER_RABBITMQ_HPP namespace ZeroTier { struct MQConfig { - const char *host; - int port; - const char *username; - const char *password; + const char *host; + int port; + const char *username; + const char *password; }; } @@ -49,26 +50,25 @@ namespace ZeroTier class RabbitMQ { public: - RabbitMQ(MQConfig *cfg, const char *queueName); - ~RabbitMQ(); + RabbitMQ(MQConfig *cfg, const char *queueName); + ~RabbitMQ(); - void init(); + void init(); - std::string consume(); + std::string consume(); private: - MQConfig *_mqc; - const char *_qName; + MQConfig *_mqc; + const char *_qName; - amqp_socket_t *_socket; - amqp_connection_state_t _conn; - amqp_queue_declare_ok_t *_q; - int _status; + amqp_socket_t *_socket; + amqp_connection_state_t _conn; + amqp_queue_declare_ok_t *_q; + int _status; - int _channel; + int _channel; Mutex _chan_m; - }; }