From 9ddc0327d442cae13d9e3253eea6915621b21824 Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Tue, 10 May 2022 08:36:39 -0700 Subject: [PATCH] enable redis member status again --- controller/PostgreSQL.cpp | 31 ++++++++++++++++++++++++------- controller/PostgreSQL.hpp | 1 + 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index c6623d556..1f52da699 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -170,6 +170,7 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R , _rc(rc) , _redis(NULL) , _cluster(NULL) + , _redisMemberStatus(false) { char myAddress[64]; _myAddressStr = myId.address().toString(myAddress); @@ -189,6 +190,11 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R // it will be padded at the end with zeroes. If longer, it'll be truncated. Utils::unhex(ssoPskHex, _ssoPsk, sizeof(_ssoPsk)); } + const char *redisMemberStatus = getenv("ZT_REDIS_MEMBER_STATUS"); + if (redisMemberStatus) { + _redisMemberStatus = true; + fprintf(stderr, "Using redis for member status\n"); + } auto c = _pool->borrow(); pqxx::work txn{*c->c}; @@ -1390,8 +1396,12 @@ void PostgreSQL::commitThread() void PostgreSQL::onlineNotificationThread() { - waitForReady(); - onlineNotification_Postgres(); + waitForReady(); + if (_redisMemberStatus) { + onlineNotification_Redis(); + } else { + onlineNotification_Postgres(); + } } void PostgreSQL::onlineNotification_Postgres() @@ -1511,9 +1521,7 @@ void PostgreSQL::onlineNotification_Redis() } } } catch (sw::redis::Error &e) { -#ifdef ZT_TRACE fprintf(stderr, "Error in online notification thread (redis): %s\n", e.what()); -#endif } std::this_thread::sleep_for(std::chrono::seconds(10)); } @@ -1524,6 +1532,7 @@ void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &control { nlohmann::json jtmp1, jtmp2; + uint64_t count = 0; for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) { uint64_t nwid_i = i->first.first; uint64_t memberid_i = i->first.second; @@ -1555,14 +1564,21 @@ void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &control .zadd("active-networks:{"+controllerId+"}", networkId, ts) .sadd("network-nodes-all:{"+controllerId+"}:"+networkId, memberId) .hmset("member:{"+controllerId+"}:"+networkId+":"+memberId, record.begin(), record.end()); + ++count; } // expire records from all-nodes and network-nodes member list uint64_t expireOld = OSUtils::now() - 300000; - tx.zremrangebyscore("nodes-online:{"+controllerId+"}", sw::redis::RightBoundedInterval(expireOld, sw::redis::BoundType::LEFT_OPEN)); - tx.zremrangebyscore("nodes-online2:{"+controllerId+"}", sw::redis::RightBoundedInterval(expireOld, sw::redis::BoundType::LEFT_OPEN)); - tx.zremrangebyscore("active-networks:{"+controllerId+"}", sw::redis::RightBoundedInterval(expireOld, sw::redis::BoundType::LEFT_OPEN)); + tx.zremrangebyscore("nodes-online:{"+controllerId+"}", + sw::redis::RightBoundedInterval(expireOld, + sw::redis::BoundType::LEFT_OPEN)); + tx.zremrangebyscore("nodes-online2:{"+controllerId+"}", + sw::redis::RightBoundedInterval(expireOld, + sw::redis::BoundType::LEFT_OPEN)); + tx.zremrangebyscore("active-networks:{"+controllerId+"}", + sw::redis::RightBoundedInterval(expireOld, + sw::redis::BoundType::LEFT_OPEN)); { std::lock_guard l(_networks_l); for (const auto &it : _networks) { @@ -1574,6 +1590,7 @@ void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &control } } tx.exec(); + fprintf(stderr, "%s: Updated online status of %d members\n", _myAddressStr.c_str(), count); } diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp index acbc65a67..dfda7f698 100644 --- a/controller/PostgreSQL.hpp +++ b/controller/PostgreSQL.hpp @@ -174,6 +174,7 @@ private: RedisConfig *_rc; std::shared_ptr _redis; std::shared_ptr _cluster; + bool _redisMemberStatus; }; } // namespace ZeroTier