From 879ef58565d7758b0fecf9c2275c9848f26d1d00 Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Wed, 20 May 2020 16:28:28 -0700 Subject: [PATCH] Finalize Redis integration --- controller/PostgreSQL.cpp | 152 ++++++++++++++++++++++++++++++++------ 1 file changed, 131 insertions(+), 21 deletions(-) diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 356166fb7..0c81ead91 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -113,11 +113,8 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R fprintf(stderr, "Central database schema version too low. This controller version requires a minimum schema version of %d. Please upgrade your Central instance", DB_MINIMUM_VERSION); exit(1); } - PQclear(res); res = NULL; - PQfinish(conn); - conn = NULL; if (_rc != NULL) { sw::redis::ConnectionOptions opts; @@ -137,6 +134,16 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R } _readyLock.lock(); + + fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download..." ZT_EOL_S, ::_timestr(), (unsigned long long)_myAddress.toInt()); + _waitNoticePrinted = true; + + initializeNetworks(conn); + initializeMembers(conn); + + PQfinish(conn); + conn = NULL; + _heartbeatThread = std::thread(&PostgreSQL::heartbeat, this); _membersDbWatcher = std::thread(&PostgreSQL::membersDbWatcher, this); _networksDbWatcher = std::thread(&PostgreSQL::networksDbWatcher, this); @@ -165,10 +172,6 @@ PostgreSQL::~PostgreSQL() bool PostgreSQL::waitForReady() { while (_ready < 2) { - if (!_waitNoticePrinted) { - _waitNoticePrinted = true; - fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download..." ZT_EOL_S, ::_timestr(), (unsigned long long)_myAddress.toInt()); - } _readyLock.lock(); _readyLock.unlock(); } @@ -236,6 +239,7 @@ void PostgreSQL::eraseNetwork(const uint64_t networkId) void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId) { char tmp2[24]; + waitForReady(); std::pair tmp, nw; Utils::hex(networkId, tmp2); tmp.first["nwid"] = tmp2; @@ -265,11 +269,28 @@ void PostgreSQL::initializeNetworks(PGconn *conn) fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn)); exit(1); } + + std::string setKey = "networks:{" + std::string(_myAddressStr.c_str()) + "}"; + if (_rc != NULL) { + try { + if (_rc->clusterMode) { + _cluster->del(setKey); + } else { + _redis->del(setKey); + } + } catch (sw::redis::Error &e) { + // del can throw an error if the key doesn't exist + // swallow it and move along + } + } + const char *params[1] = { _myAddressStr.c_str() }; + fprintf(stderr, "Initializing Networks...\n"); + PGresult *res = PQexecParams(conn, "SELECT id, EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000, capabilities, " "enable_broadcast, EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000, mtu, multicast_limit, name, private, remote_trace_level, " "remote_trace_target, revision, rules, tags, v4_assign_mode, v6_assign_mode FROM ztc_network " @@ -295,9 +316,18 @@ void PostgreSQL::initializeNetworks(PGconn *conn) const char *nwidparam[1] = { PQgetvalue(res, i, 0) }; + std::string nwid = PQgetvalue(res, i, 0); + + if (_rc != NULL) { + if (_rc->clusterMode) { + _cluster->sadd(setKey, nwid); + } else { + _redis->sadd(setKey, nwid); + } + } - config["id"] = PQgetvalue(res, i, 0); - config["nwid"] = PQgetvalue(res, i, 0); + config["id"] = nwid; + config["nwid"] = nwid; try { config["creationTime"] = std::stoull(PQgetvalue(res, i, 1)); } catch (std::exception &e) { @@ -416,8 +446,11 @@ void PostgreSQL::initializeNetworks(PGconn *conn) } _readyLock.unlock(); } + } catch (sw::redis::Error &e) { + fprintf(stderr, "ERROR: Error initializing networks in Redis: %s\n", e.what()); + exit(-1); } catch (std::exception &e) { - fprintf(stderr, "ERROR: Error initializing networks: %s", e.what()); + fprintf(stderr, "ERROR: Error initializing networks: %s\n", e.what()); exit(-1); } } @@ -429,11 +462,32 @@ void PostgreSQL::initializeMembers(PGconn *conn) fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn)); exit(1); } - + std::string setKeyBase = "network-nodes-all:{" + std::string(_myAddressStr.c_str()) + "}:"; + if (_rc != NULL) { + std::lock_guard l(_networks_l); + for ( auto it : _networks) { + uint64_t nwid_i = it.first; + char nwidTmp[64] = {0}; + OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i); + std::string nwid(nwidTmp); + std::string key = setKeyBase + nwid; + if (_rc->clusterMode) { + try { + _cluster->del(key); + } catch (...) {} + } else { + try { + _redis->del(key); + } catch (...) {} + } + } + } + const char *params[1] = { _myAddressStr.c_str() }; + fprintf(stderr, "Initializing Members...\n"); PGresult *res = PQexecParams(conn, "SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000, m.identity, " " EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000, " @@ -464,6 +518,15 @@ void PostgreSQL::initializeMembers(PGconn *conn) std::string memberId(PQgetvalue(res, i, 0)); std::string networkId(PQgetvalue(res, i, 1)); + + if (_rc != NULL) { + if (_rc->clusterMode) { + _cluster->sadd(setKeyBase + networkId, memberId); + } else { + _redis->sadd(setKeyBase + networkId, memberId); + } + } + std::string ctime = PQgetvalue(res, i, 5); config["id"] = memberId; config["nwid"] = networkId; @@ -564,6 +627,14 @@ void PostgreSQL::initializeMembers(PGconn *conn) config["ipAssignments"].push_back(ipaddr); } + if (_rc != NULL) { + if (_rc->clusterMode) { + _cluster->sadd(setKeyBase + networkId, memberId); + } else { + _redis->sadd(setKeyBase + networkId, memberId); + } + } + _memberChanged(empty, config, false); } @@ -575,6 +646,8 @@ void PostgreSQL::initializeMembers(PGconn *conn) } _readyLock.unlock(); } + } catch (sw::redis::Error &e) { + fprintf(stderr, "ERROR: Error initializing members (redis): %s\n", e.what()); } catch (std::exception &e) { fprintf(stderr, "ERROR: Error initializing members: %s\n", e.what()); exit(-1); @@ -670,8 +743,6 @@ void PostgreSQL::membersDbWatcher() exit(1); } - initializeMembers(conn); - if (_rc) { PQfinish(conn); conn = NULL; @@ -797,8 +868,6 @@ void PostgreSQL::networksDbWatcher() exit(1); } - initializeNetworks(conn); - if (_rc) { PQfinish(conn); conn = NULL; @@ -1344,6 +1413,20 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "ERROR: Error updating member: %s\n", e.what()); } + if (_rc != NULL) { + try { + std::string id = (*config)["id"]; + std::string controllerId = _myAddressStr.c_str(); + std::string key = "networks:{" + controllerId + "}"; + if (_rc->clusterMode) { + _cluster->sadd(key, id); + } else { + _redis->sadd(key, id); + } + } catch (sw::redis::Error &e) { + fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); + } + } } else if (objtype == "_delete_network") { try { std::string networkId = (*config)["nwid"]; @@ -1367,6 +1450,20 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "ERROR: Error deleting network: %s\n", e.what()); } + if (_rc != NULL) { + try { + std::string id = (*config)["id"]; + std::string controllerId = _myAddressStr.c_str(); + std::string key = "networks:{" + controllerId + "}"; + if (_rc->clusterMode) { + _cluster->srem(key, id); + } else { + _redis->srem(key, id); + } + } catch (sw::redis::Error &e) { + fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); + } + } } else if (objtype == "_delete_member") { try { std::string memberId = (*config)["id"]; @@ -1394,6 +1491,21 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "ERROR: Error deleting member: %s\n", e.what()); } + if (_rc != NULL) { + try { + std::string memberId = (*config)["id"]; + std::string networkId = (*config)["nwid"]; + std::string controllerId = _myAddressStr.c_str(); + std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId; + if (_rc->clusterMode) { + _cluster->srem(key, memberId); + } else { + _redis->srem(key, memberId); + } + } catch (sw::redis::Error &e) { + fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what()); + } + } } else { fprintf(stderr, "ERROR: unknown objtype"); } @@ -1414,6 +1526,8 @@ void PostgreSQL::commitThread() void PostgreSQL::onlineNotificationThread() { + waitForReady(); + if (_rc != NULL) { onlineNotification_Redis(); } else { @@ -1569,7 +1683,6 @@ void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &control std::unordered_map< std::pair,std::pair,_PairHasher > &lastOnline) { - fprintf(stderr, "Redis Update\n"); nlohmann::json jtmp1, jtmp2; for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) { uint64_t nwid_i = i->first.first; @@ -1581,12 +1694,9 @@ void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &control OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", memberid_i); if (!get(nwid_i, jtmp1, memberid_i, jtmp2)){ + fprintf(stderr, "network or member doesn't exist\n"); continue; // skip non existent members/networks } - auto found = _networks.find(nwid_i); - if (found == _networks.end()) { - continue; // skip members trying to join non-existant networks - } std::string networkId(nwidTmp); std::string memberId(memTmp); @@ -1603,7 +1713,7 @@ void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &control tx.zadd("nodes-online:{"+controllerId+"}", memberId, ts) .zadd("network-nodes-online:{"+controllerId+"}:"+networkId, memberId, ts) .sadd("network-nodes-all:{"+controllerId+"}:"+networkId, memberId) - .hmset("network:{"+controllerId+"}:"+networkId+":"+memberId, record.begin(), record.end()); + .hmset("member:{"+controllerId+"}:"+networkId+":"+memberId, record.begin(), record.end()); } tx.exec();