diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 29d61a39c..8dc56c9cc 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -74,13 +74,14 @@ std::string join(const std::vector &elements, const char * const se using namespace ZeroTier; -PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort) +PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, mq_config *mqc) : DB(nc, myId, path) , _ready(0) , _connected(1) , _run(1) , _waitNoticePrinted(false) , _listenPort(listenPort) + , _mqc(mqc) { _connString = std::string(path) + " application_name=controller_" +_myAddressStr; @@ -601,6 +602,21 @@ void PostgreSQL::membersDbWatcher() PQclear(res); res = NULL; + if (this->_mqc != NULL) { + _membersWatcher_RabbitMQ(); + } else { + _membersWatcher_Postgres(conn); + PQfinish(conn); + conn = NULL; + } + + if (_run == 1) { + fprintf(stderr, "ERROR: %s membersDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str()); + exit(9); + } +} + +void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) { while(_run == 1) { if (PQstatus(conn) != CONNECTION_OK) { fprintf(stderr, "ERROR: Member Watcher lost connection to Postgres."); @@ -627,12 +643,10 @@ void PostgreSQL::membersDbWatcher() } std::this_thread::sleep_for(std::chrono::milliseconds(10)); } - PQfinish(conn); - conn = NULL; - if (_run == 1) { - fprintf(stderr, "ERROR: %s membersDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str()); - exit(9); - } +} + +void PostgreSQL::_membersWatcher_RabbitMQ() { + } void PostgreSQL::networksDbWatcher() @@ -658,6 +672,21 @@ void PostgreSQL::networksDbWatcher() PQclear(res); res = NULL; + if (this->_mqc != NULL) { + _networksWatcher_RabbitMQ(); + } else { + _networksWatcher_Postgres(conn); + PQfinish(conn); + conn = NULL; + } + + if (_run == 1) { + fprintf(stderr, "ERROR: %s networksDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str()); + exit(8); + } +} + +void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) { while(_run == 1) { if (PQstatus(conn) != CONNECTION_OK) { fprintf(stderr, "ERROR: Network Watcher lost connection to Postgres."); @@ -682,12 +711,10 @@ void PostgreSQL::networksDbWatcher() } std::this_thread::sleep_for(std::chrono::milliseconds(10)); } - PQfinish(conn); - conn = NULL; - if (_run == 1) { - fprintf(stderr, "ERROR: %s networksDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str()); - exit(8); - } +} + +void PostgreSQL::_networksWatcher_RabbitMQ() { + } void PostgreSQL::commitThread() diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp index 7932317b9..e0dcdf06f 100644 --- a/controller/PostgreSQL.hpp +++ b/controller/PostgreSQL.hpp @@ -40,6 +40,14 @@ extern "C" { namespace ZeroTier { +struct mq_config +{ + const char *host; + int port; + const char *username; + const char *password; +}; + /** * A controller database driver that talks to PostgreSQL * @@ -49,7 +57,7 @@ namespace ZeroTier class PostgreSQL : public DB { public: - PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort); + PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, mq_config *mqc = NULL); virtual ~PostgreSQL(); virtual bool waitForReady(); @@ -70,7 +78,13 @@ private: void initializeMembers(PGconn *conn); void heartbeat(); void membersDbWatcher(); + void _membersWatcher_Postgres(PGconn *conn); + void _membersWatcher_RabbitMQ(); void networksDbWatcher(); + void _networksWatcher_Postgres(PGconn *conn); + void _networksWatcher_RabbitMQ(); + + void commitThread(); void onlineNotificationThread(); @@ -100,6 +114,8 @@ private: mutable volatile bool _waitNoticePrinted; int _listenPort; + + mq_config *_mqc; }; }