enable redis member status again

This commit is contained in:
Grant Limberg 2022-05-10 08:36:39 -07:00
parent 5fcaed086d
commit 9ddc0327d4
No known key found for this signature in database
GPG Key ID: 8F2F97D3BE8D7735
2 changed files with 25 additions and 7 deletions

View File

@ -170,6 +170,7 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R
, _rc(rc) , _rc(rc)
, _redis(NULL) , _redis(NULL)
, _cluster(NULL) , _cluster(NULL)
, _redisMemberStatus(false)
{ {
char myAddress[64]; char myAddress[64];
_myAddressStr = myId.address().toString(myAddress); _myAddressStr = myId.address().toString(myAddress);
@ -189,6 +190,11 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R
// it will be padded at the end with zeroes. If longer, it'll be truncated. // it will be padded at the end with zeroes. If longer, it'll be truncated.
Utils::unhex(ssoPskHex, _ssoPsk, sizeof(_ssoPsk)); Utils::unhex(ssoPskHex, _ssoPsk, sizeof(_ssoPsk));
} }
const char *redisMemberStatus = getenv("ZT_REDIS_MEMBER_STATUS");
if (redisMemberStatus) {
_redisMemberStatus = true;
fprintf(stderr, "Using redis for member status\n");
}
auto c = _pool->borrow(); auto c = _pool->borrow();
pqxx::work txn{*c->c}; pqxx::work txn{*c->c};
@ -1391,8 +1397,12 @@ void PostgreSQL::commitThread()
void PostgreSQL::onlineNotificationThread() void PostgreSQL::onlineNotificationThread()
{ {
waitForReady(); waitForReady();
if (_redisMemberStatus) {
onlineNotification_Redis();
} else {
onlineNotification_Postgres(); onlineNotification_Postgres();
} }
}
void PostgreSQL::onlineNotification_Postgres() void PostgreSQL::onlineNotification_Postgres()
{ {
@ -1511,9 +1521,7 @@ void PostgreSQL::onlineNotification_Redis()
} }
} }
} catch (sw::redis::Error &e) { } catch (sw::redis::Error &e) {
#ifdef ZT_TRACE
fprintf(stderr, "Error in online notification thread (redis): %s\n", e.what()); fprintf(stderr, "Error in online notification thread (redis): %s\n", e.what());
#endif
} }
std::this_thread::sleep_for(std::chrono::seconds(10)); std::this_thread::sleep_for(std::chrono::seconds(10));
} }
@ -1524,6 +1532,7 @@ void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &control
{ {
nlohmann::json jtmp1, jtmp2; nlohmann::json jtmp1, jtmp2;
uint64_t count = 0;
for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) { for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) {
uint64_t nwid_i = i->first.first; uint64_t nwid_i = i->first.first;
uint64_t memberid_i = i->first.second; uint64_t memberid_i = i->first.second;
@ -1555,14 +1564,21 @@ void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &control
.zadd("active-networks:{"+controllerId+"}", networkId, ts) .zadd("active-networks:{"+controllerId+"}", networkId, ts)
.sadd("network-nodes-all:{"+controllerId+"}:"+networkId, memberId) .sadd("network-nodes-all:{"+controllerId+"}:"+networkId, memberId)
.hmset("member:{"+controllerId+"}:"+networkId+":"+memberId, record.begin(), record.end()); .hmset("member:{"+controllerId+"}:"+networkId+":"+memberId, record.begin(), record.end());
++count;
} }
// expire records from all-nodes and network-nodes member list // expire records from all-nodes and network-nodes member list
uint64_t expireOld = OSUtils::now() - 300000; uint64_t expireOld = OSUtils::now() - 300000;
tx.zremrangebyscore("nodes-online:{"+controllerId+"}", sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN)); tx.zremrangebyscore("nodes-online:{"+controllerId+"}",
tx.zremrangebyscore("nodes-online2:{"+controllerId+"}", sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN)); sw::redis::RightBoundedInterval<double>(expireOld,
tx.zremrangebyscore("active-networks:{"+controllerId+"}", sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN)); sw::redis::BoundType::LEFT_OPEN));
tx.zremrangebyscore("nodes-online2:{"+controllerId+"}",
sw::redis::RightBoundedInterval<double>(expireOld,
sw::redis::BoundType::LEFT_OPEN));
tx.zremrangebyscore("active-networks:{"+controllerId+"}",
sw::redis::RightBoundedInterval<double>(expireOld,
sw::redis::BoundType::LEFT_OPEN));
{ {
std::lock_guard<std::mutex> l(_networks_l); std::lock_guard<std::mutex> l(_networks_l);
for (const auto &it : _networks) { for (const auto &it : _networks) {
@ -1574,6 +1590,7 @@ void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &control
} }
} }
tx.exec(); tx.exec();
fprintf(stderr, "%s: Updated online status of %d members\n", _myAddressStr.c_str(), count);
} }

View File

@ -174,6 +174,7 @@ private:
RedisConfig *_rc; RedisConfig *_rc;
std::shared_ptr<sw::redis::Redis> _redis; std::shared_ptr<sw::redis::Redis> _redis;
std::shared_ptr<sw::redis::RedisCluster> _cluster; std::shared_ptr<sw::redis::RedisCluster> _cluster;
bool _redisMemberStatus;
}; };
} // namespace ZeroTier } // namespace ZeroTier