From 563655a1a410f6b86d86b3f3747bc1a8d2e88d76 Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Tue, 12 May 2020 11:56:19 -0700 Subject: [PATCH] Redis now usable as a message queue --- controller/PostgreSQL.cpp | 107 +++++++++++++++++++++++++++++++++----- controller/PostgreSQL.hpp | 7 +-- make-mac.mk | 3 +- 3 files changed, 101 insertions(+), 16 deletions(-) diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 0640cb8ee..31d5b9118 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -68,6 +68,10 @@ std::string join(const std::vector &elements, const char * const se using namespace ZeroTier; +using Attrs = std::vector>; +using Item = std::pair; +using ItemStream = std::vector; + PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, RedisConfig *rc) : DB() , _myId(myId) @@ -124,9 +128,9 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R opts.db = 0; poolOpts.size = 10; if (_rc->clusterMode) { - _cluster = new sw::redis::RedisCluster(opts, poolOpts); + _cluster = std::make_shared(opts, poolOpts); } else { - _redis = new sw::redis::Redis(opts, poolOpts); + _redis = std::make_shared(opts, poolOpts); } } @@ -145,17 +149,15 @@ PostgreSQL::~PostgreSQL() _run = 0; std::this_thread::sleep_for(std::chrono::milliseconds(100)); - - _heartbeatThread.join(); _membersDbWatcher.join(); _networksDbWatcher.join(); + _commitQueue.stop(); for (int i = 0; i < ZT_CENTRAL_CONTROLLER_COMMIT_THREADS; ++i) { _commitThread[i].join(); } _onlineNotificationThread.join(); - delete _redis; - delete _cluster; + fprintf(stderr, "~PostgreSQL() done\n"); } @@ -651,6 +653,7 @@ void PostgreSQL::heartbeat() PQfinish(conn); conn = NULL; + fprintf(stderr, "Exited heartbeat thread\n"); } void PostgreSQL::membersDbWatcher() @@ -664,10 +667,10 @@ void PostgreSQL::membersDbWatcher() initializeMembers(conn); - if (false) { - // PQfinish(conn); - // conn = NULL; - // _membersWatcher_RabbitMQ(); + if (_rc) { + PQfinish(conn); + conn = NULL; + _membersWatcher_Redis(); } else { _membersWatcher_Postgres(conn); PQfinish(conn); @@ -722,9 +725,47 @@ void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) { } } -void PostgreSQL::_membersWatcher_Reids() { - char buff[11] = {0}; +void PostgreSQL::_membersWatcher_Redis() { + char buf[11] = {0}; + std::string key = "member-stream:{" + std::string(_myAddress.toString(buf)) + "}"; + while (_run == 1) { + json tmp; + std::unordered_map result; + if (_rc->clusterMode) { + _cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); + } else { + _redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); + } + if (!result.empty()) { + for (auto element : result) { + fprintf(stdout, "Received notification from: %s\n", element.first.c_str()); + for (auto rec : element.second) { + std::string id = rec.first; + auto attrs = rec.second; + fprintf(stdout, "Record ID: %s\n", id.c_str()); + fprintf(stdout, "attrs len: %lu\n", attrs.size()); + for (auto a : attrs) { + fprintf(stdout, "key: %s\nvalue: %s\n", a.first.c_str(), a.second.c_str()); + try { + tmp = json::parse(a.second); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object()||newConfig.is_object()) { + _memberChanged(oldConfig,newConfig,(this->_ready >= 2)); + } + } catch (...) { + fprintf(stderr, "json parse error in networkWatcher_Redis\n"); + } + } + } + } + } + } + fprintf(stderr, "membersWatcher ended\n"); } void PostgreSQL::networksDbWatcher() @@ -795,7 +836,48 @@ void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) { } void PostgreSQL::_networksWatcher_Redis() { + char buf[11] = {0}; + std::string key = "network-stream:{" + std::string(_myAddress.toString(buf)) + "}"; + + while (_run == 1) { + json tmp; + std::unordered_map result; + if (_rc->clusterMode) { + _cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); + } else { + _redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); + } + + if (!result.empty()) { + for (auto element : result) { + fprintf(stdout, "Received notification from: %s\n", element.first.c_str()); + for (auto rec : element.second) { + std::string id = rec.first; + auto attrs = rec.second; + fprintf(stdout, "Record ID: %s\n", id.c_str()); + fprintf(stdout, "attrs len: %lu\n", attrs.size()); + for (auto a : attrs) { + fprintf(stdout, "key: %s\nvalue: %s\n", a.first.c_str(), a.second.c_str()); + try { + tmp = json::parse(a.second); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object()||newConfig.is_object()) { + _networkChanged(oldConfig,newConfig,(this->_ready >= 2)); + } + } catch (...) { + fprintf(stderr, "json parse error in networkWatcher_Redis\n"); + } + } + } + } + } + } + fprintf(stderr, "networksWatcher ended\n"); } void PostgreSQL::commitThread() @@ -1293,6 +1375,7 @@ void PostgreSQL::commitThread() fprintf(stderr, "ERROR: %s commitThread should still be running! Exiting Controller.\n", _myAddressStr.c_str()); exit(7); } + fprintf(stderr, "commitThread finished\n"); } void PostgreSQL::onlineNotificationThread() diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp index 986559acf..44347cd81 100644 --- a/controller/PostgreSQL.hpp +++ b/controller/PostgreSQL.hpp @@ -20,6 +20,7 @@ #define ZT_CENTRAL_CONTROLLER_COMMIT_THREADS 4 +#include #include extern "C" { @@ -64,7 +65,7 @@ private: void networksDbWatcher(); void _networksWatcher_Postgres(PGconn *conn); - void _membersWatcher_Reids(); + void _membersWatcher_Redis(); void _networksWatcher_Redis(); void commitThread(); @@ -100,8 +101,8 @@ private: int _listenPort; RedisConfig *_rc; - sw::redis::Redis *_redis; - sw::redis::RedisCluster *_cluster; + std::shared_ptr _redis; + std::shared_ptr _cluster; }; } // namespace ZeroTier diff --git a/make-mac.mk b/make-mac.mk index 6625dc85e..5e7a67c20 100644 --- a/make-mac.mk +++ b/make-mac.mk @@ -28,9 +28,10 @@ include objects.mk ONE_OBJS+=osdep/MacEthernetTap.o osdep/MacKextEthernetTap.o ext/http-parser/http_parser.o ifeq ($(ZT_CONTROLLER),1) - LIBS+=-L/usr/local/opt/libpq/lib -lpq -Lext/redis-plus-plus-1.1.1/install/macos/lib -lredis++ -Lext/hiredis-0.14.1/lib/macos -lhiredis + LIBS+=-L/usr/local/opt/libpq/lib -lpq ext/redis-plus-plus-1.1.1/install/macos/lib/libredis++.a ext/hiredis-0.14.1/lib/macos/libhiredis.a DEFS+=-DZT_CONTROLLER_USE_LIBPQ -DZT_CONTROLLER_USE_REDIS -DZT_CONTROLLER INCLUDES+=-I/usr/local/opt/libpq/include -Iext/hiredis-0.14.1/include/ -Iext/redis-plus-plus-1.1.1/install/macos/include/sw/ + endif # Official releases are signed with our Apple cert and apply software updates by default