diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 86327137e..7572682c1 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -512,6 +512,18 @@ void PostgreSQL::initializeNetworks() fprintf(stderr, "Initializing Networks...\n"); + if (_redisMemberStatus) { + try { + if (_rc->clusterMode) { + _cluster->del(setKey); + } else { + _redis->del(setKey); + } + } catch (...) {} + } + + std::unordered_set networkSet; + char qbuf[2048] = {0}; sprintf(qbuf, "SELECT n.id, (EXTRACT(EPOCH FROM n.creation_time AT TIME ZONE 'UTC')*1000)::bigint as creation_time, n.capabilities, " "n.enable_broadcast, (EXTRACT(EPOCH FROM n.last_modified AT TIME ZONE 'UTC')*1000)::bigint AS last_modified, n.mtu, n.multicast_limit, n.name, n.private, n.remote_trace_level, " @@ -616,6 +628,8 @@ void PostgreSQL::initializeNetworks() config["clientId"] = clientId.value_or(""); config["authorizationEndpoint"] = authorizationEndpoint.value_or(""); + networkSet.insert(nwid); + if (dnsDomain.has_value()) { std::string serverList = dnsServers.value(); json obj; @@ -680,6 +694,20 @@ void PostgreSQL::initializeNetworks() _pool->unborrow(c2); _pool->unborrow(c); + if (!networkSet.empty()) { + if (_redisMemberStatus) { + if (_rc->clusterMode) { + auto tx = _cluster->transaction(_myAddressStr, true); + tx.sadd(setKey, networkSet.begin(), networkSet.end()); + tx.exec(); + } else { + auto tx = _cluster->transaction(_myAddressStr, true); + tx.sadd(setKey, networkSet.begin(), networkSet.end()); + tx.exec(); + } + } + } + if (++this->_ready == 2) { if (_waitNoticePrinted) { fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt()); @@ -703,6 +731,39 @@ void PostgreSQL::initializeMembers() std::unordered_map networkMembers; fprintf(stderr, "Initializing Members...\n"); + std::string setKeyBase = "network-nodes-all:{" + _myAddressStr + "}:"; + + if (_redisMemberStatus) { + std::lock_guard l(_networks_l); + std::unordered_set deletes; + for ( auto it : _networks) { + uint64_t nwid_i = it.first; + char nwidTmp[64] = {0}; + OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i); + std::string nwid(nwidTmp); + std::string key = setKeyBase + nwid; + deletes.insert(key); + } + + if (!deletes.empty()) { + try { + if (_rc->clusterMode) { + auto tx = _cluster->transaction(_myAddressStr, true); + for (std::string k : deletes) { + tx.del(k); + } + tx.exec(); + } else { + auto tx = _redis->transaction(true); + for (std::string k : deletes) { + tx.del(k); + } + tx.exec(); + } + } catch (...) {} + } + } + char qbuf[2048]; sprintf(qbuf, "SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, (EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000)::bigint, m.identity, " " (EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000)::bigint, " @@ -782,6 +843,8 @@ void PostgreSQL::initializeMembers() std::optional authenticationExpiryTime = std::get<19>(row); std::string assignedAddresses = std::get<20>(row); + networkMembers.insert(std::pair(setKeyBase+networkId, memberId)); + config["id"] = memberId; config["address"] = memberId; config["nwid"] = networkId; @@ -837,6 +900,24 @@ void PostgreSQL::initializeMembers() _pool->unborrow(c2); _pool->unborrow(c); + if (!networkMembers.empty()) { + if (_redisMemberStatus) { + if (_rc->clusterMode) { + auto tx = _cluster->transaction(_myAddressStr, true); + for (auto it : networkMembers) { + tx.sadd(it.first, it.second); + } + tx.exec(); + } else { + auto tx = _redis->transaction(true); + for (auto it : networkMembers) { + tx.sadd(it.first, it.second); + } + tx.exec(); + } + } + } + if (++this->_ready == 2) { if (_waitNoticePrinted) { fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt()); @@ -905,6 +986,14 @@ void PostgreSQL::heartbeat() } _pool->unborrow(c); + if (_redisMemberStatus) { + if (_rc->clusterMode) { + _cluster->zadd("controllers", "controllerId", ts); + } else { + _redis->zadd("controllers", "controllerId", ts); + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } fprintf(stderr, "Exited heartbeat thread\n"); @@ -1353,6 +1442,20 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "%s ERROR: Error updating network: %s\n", _myAddressStr.c_str(), e.what()); } + if (_redisMemberStatus) { + try { + std::string id = config["id"]; + std::string controllerId = _myAddressStr.c_str(); + std::string key = "networks:{" + controllerId + "}"; + if (_rc->clusterMode) { + _cluster->sadd(key, id); + } else { + _redis->sadd(key, id); + } + } catch (sw::redis::Error &e) { + fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); + } + } } else if (objtype == "_delete_network") { // fprintf(stderr, "%s: commitThread: delete network\n", _myAddressStr.c_str()); try { @@ -1367,6 +1470,22 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "%s ERROR: Error deleting network: %s\n", _myAddressStr.c_str(), e.what()); } + if (_redisMemberStatus) { + try { + std::string id = config["id"]; + std::string controllerId = _myAddressStr.c_str(); + std::string key = "networks:{" + controllerId + "}"; + if (_rc->clusterMode) { + _cluster->srem(key, id); + _cluster->del("network-nodes-online:{"+controllerId+"}:"+id); + } else { + _redis->srem(key, id); + _redis->del("network-nodes-online:{"+controllerId+"}:"+id); + } + } catch (sw::redis::Error &e) { + fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); + } + } } else if (objtype == "_delete_member") { // fprintf(stderr, "%s commitThread: delete member\n", _myAddressStr.c_str()); @@ -1384,6 +1503,23 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "%s ERROR: Error deleting member: %s\n", _myAddressStr.c_str(), e.what()); } + if (_redisMemberStatus) { + try { + std::string memberId = config["id"]; + std::string networkId = config["nwid"]; + std::string controllerId = _myAddressStr.c_str(); + std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId; + if (_rc->clusterMode) { + _cluster->srem(key, memberId); + _cluster->del("member:{"+controllerId+"}:"+networkId+":"+memberId); + } else { + _redis->srem(key, memberId); + _redis->del("member:{"+controllerId+"}:"+networkId+":"+memberId); + } + } catch (sw::redis::Error &e) { + fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what()); + } + } } else { fprintf(stderr, "%s ERROR: unknown objtype\n", _myAddressStr.c_str()); } @@ -1408,6 +1544,15 @@ void PostgreSQL::onlineNotificationThread() } } +/** + * ONLY UNCOMMENT FOR TEMPORARY DB MAINTENANCE + * + * This define temproarly turns off writing to the member status table + * so it can be reindexed when the indexes get too large. + */ + +// #define DISABLE_MEMBER_STATUS 1 + void PostgreSQL::onlineNotification_Postgres() { _connected = 1; @@ -1423,7 +1568,8 @@ void PostgreSQL::onlineNotification_Postgres() std::lock_guard l(_lastOnline_l); lastOnline.swap(_lastOnline); } - + +#ifndef DISABLE_MEMBER_STATUS pqxx::work w(*c->c); pqxx::work w2(*c2->c); @@ -1482,6 +1628,7 @@ void PostgreSQL::onlineNotification_Postgres() pipe.complete(); w.commit(); fprintf(stderr, "%s: Updated online status of %d members\n", _myAddressStr.c_str(), updateCount); +#endif } catch (std::exception &e) { fprintf(stderr, "%s: error in onlinenotification thread: %s\n", _myAddressStr.c_str(), e.what()); }