diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 44ba2ae5d..c12357fe3 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -273,18 +273,18 @@ void PostgreSQL::initializeNetworks(PGconn *conn) std::string setKey = "networks:{" + _myAddressStr + "}"; - if (_rc != NULL) { - try { - if (_rc->clusterMode) { - _cluster->del(setKey); - } else { - _redis->del(setKey); - } - } catch (sw::redis::Error &e) { - // del can throw an error if the key doesn't exist - // swallow it and move along - } - } + // if (_rc != NULL) { + // try { + // if (_rc->clusterMode) { + // _cluster->del(setKey); + // } else { + // _redis->del(setKey); + // } + // } catch (sw::redis::Error &e) { + // // del can throw an error if the key doesn't exist + // // swallow it and move along + // } + // } std::unordered_set networkSet; @@ -475,17 +475,17 @@ void PostgreSQL::initializeNetworks(PGconn *conn) PQclear(res); - if(!networkSet.empty()) { - if (_rc && _rc->clusterMode) { - auto tx = _cluster->transaction(_myAddressStr, true); - tx.sadd(setKey, networkSet.begin(), networkSet.end()); - tx.exec(); - } else if (_rc && !_rc->clusterMode) { - auto tx = _redis->transaction(true); - tx.sadd(setKey, networkSet.begin(), networkSet.end()); - tx.exec(); - } - } + // if(!networkSet.empty()) { + // if (_rc && _rc->clusterMode) { + // auto tx = _cluster->transaction(_myAddressStr, true); + // tx.sadd(setKey, networkSet.begin(), networkSet.end()); + // tx.exec(); + // } else if (_rc && !_rc->clusterMode) { + // auto tx = _redis->transaction(true); + // tx.sadd(setKey, networkSet.begin(), networkSet.end()); + // tx.exec(); + // } + // } if (++this->_ready == 2) { if (_waitNoticePrinted) { @@ -509,36 +509,36 @@ void PostgreSQL::initializeMembers(PGconn *conn) fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn)); exit(1); } - std::string setKeyBase = "network-nodes-all:{" + _myAddressStr + "}:"; + // std::string setKeyBase = "network-nodes-all:{" + _myAddressStr + "}:"; - if (_rc != NULL) { - std::lock_guard l(_networks_l); - std::unordered_set deletes; - for ( auto it : _networks) { - uint64_t nwid_i = it.first; - char nwidTmp[64] = {0}; - OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i); - std::string nwid(nwidTmp); - std::string key = setKeyBase + nwid; - deletes.insert(key); - } + // if (_rc != NULL) { + // std::lock_guard l(_networks_l); + // std::unordered_set deletes; + // for ( auto it : _networks) { + // uint64_t nwid_i = it.first; + // char nwidTmp[64] = {0}; + // OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i); + // std::string nwid(nwidTmp); + // std::string key = setKeyBase + nwid; + // deletes.insert(key); + // } - if (!deletes.empty()) { - if (_rc->clusterMode) { - auto tx = _cluster->transaction(_myAddressStr, true); - for (std::string k : deletes) { - tx.del(k); - } - tx.exec(); - } else { - auto tx = _redis->transaction(true); - for (std::string k : deletes) { - tx.del(k); - } - tx.exec(); - } - } - } + // if (!deletes.empty()) { + // if (_rc->clusterMode) { + // auto tx = _cluster->transaction(_myAddressStr, true); + // for (std::string k : deletes) { + // tx.del(k); + // } + // tx.exec(); + // } else { + // auto tx = _redis->transaction(true); + // for (std::string k : deletes) { + // tx.del(k); + // } + // tx.exec(); + // } + // } + // } const char *params[1] = { _myAddressStr.c_str() @@ -578,7 +578,7 @@ void PostgreSQL::initializeMembers(PGconn *conn) std::string memberId(PQgetvalue(res, i, 0)); std::string networkId(PQgetvalue(res, i, 1)); - networkMembers.insert(std::pair(setKeyBase+networkId, memberId)); + // networkMembers.insert(std::pair(setKeyBase+networkId, memberId)); std::string ctime = PQgetvalue(res, i, 5); config["id"] = memberId; @@ -685,23 +685,23 @@ void PostgreSQL::initializeMembers(PGconn *conn) PQclear(res); - if (!networkMembers.empty()) { - if (_rc != NULL) { - if (_rc->clusterMode) { - auto tx = _cluster->transaction(_myAddressStr, true); - for (auto it : networkMembers) { - tx.sadd(it.first, it.second); - } - tx.exec(); - } else { - auto tx = _redis->transaction(true); - for (auto it : networkMembers) { - tx.sadd(it.first, it.second); - } - tx.exec(); - } - } - } + // if (!networkMembers.empty()) { + // if (_rc != NULL) { + // if (_rc->clusterMode) { + // auto tx = _cluster->transaction(_myAddressStr, true); + // for (auto it : networkMembers) { + // tx.sadd(it.first, it.second); + // } + // tx.exec(); + // } else { + // auto tx = _redis->transaction(true); + // for (auto it : networkMembers) { + // tx.sadd(it.first, it.second); + // } + // tx.exec(); + // } + // } + // } if (++this->_ready == 2) { if (_waitNoticePrinted) { fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt()); @@ -755,7 +755,7 @@ void PostgreSQL::heartbeat() std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD); std::string now = std::to_string(ts); std::string host_port = std::to_string(_listenPort); - std::string use_redis = (_rc != NULL) ? "true" : "false"; + std::string use_redis = "false"; // (_rc != NULL) ? "true" : "false"; const char *values[10] = { controllerId, hostname, @@ -788,13 +788,13 @@ void PostgreSQL::heartbeat() } PQclear(res); } - if (_rc != NULL) { - if (_rc->clusterMode) { - _cluster->zadd("controllers", controllerId, ts); - } else { - _redis->zadd("controllers", controllerId, ts); - } - } + // if (_rc != NULL) { + // if (_rc->clusterMode) { + // _cluster->zadd("controllers", controllerId, ts); + // } else { + // _redis->zadd("controllers", controllerId, ts); + // } + // } std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } @@ -833,6 +833,7 @@ void PostgreSQL::membersDbWatcher() void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) { char buf[11] = {0}; std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf)); + fprintf(stderr, "Listening to member stream: %s\n", cmd.c_str()); PGresult *res = PQexec(conn, cmd.c_str()); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); @@ -874,7 +875,7 @@ void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) { void PostgreSQL::_membersWatcher_Redis() { char buf[11] = {0}; std::string key = "member-stream:{" + std::string(_myAddress.toString(buf)) + "}"; - + fprintf(stderr, "Listening to member stream: %s\n", key.c_str()); while (_run == 1) { try { json tmp; @@ -1515,20 +1516,20 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "ERROR: Error updating member: %s\n", e.what()); } - if (_rc != NULL) { - try { - std::string id = (*config)["id"]; - std::string controllerId = _myAddressStr.c_str(); - std::string key = "networks:{" + controllerId + "}"; - if (_rc->clusterMode) { - _cluster->sadd(key, id); - } else { - _redis->sadd(key, id); - } - } catch (sw::redis::Error &e) { - fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); - } - } + // if (_rc != NULL) { + // try { + // std::string id = (*config)["id"]; + // std::string controllerId = _myAddressStr.c_str(); + // std::string key = "networks:{" + controllerId + "}"; + // if (_rc->clusterMode) { + // _cluster->sadd(key, id); + // } else { + // _redis->sadd(key, id); + // } + // } catch (sw::redis::Error &e) { + // fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); + // } + // } } else if (objtype == "_delete_network") { try { std::string networkId = (*config)["nwid"]; @@ -1552,22 +1553,22 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "ERROR: Error deleting network: %s\n", e.what()); } - if (_rc != NULL) { - try { - std::string id = (*config)["id"]; - std::string controllerId = _myAddressStr.c_str(); - std::string key = "networks:{" + controllerId + "}"; - if (_rc->clusterMode) { - _cluster->srem(key, id); - _cluster->del("network-nodes-online:{"+controllerId+"}:"+id); - } else { - _redis->srem(key, id); - _redis->del("network-nodes-online:{"+controllerId+"}:"+id); - } - } catch (sw::redis::Error &e) { - fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); - } - } + // if (_rc != NULL) { + // try { + // std::string id = (*config)["id"]; + // std::string controllerId = _myAddressStr.c_str(); + // std::string key = "networks:{" + controllerId + "}"; + // if (_rc->clusterMode) { + // _cluster->srem(key, id); + // _cluster->del("network-nodes-online:{"+controllerId+"}:"+id); + // } else { + // _redis->srem(key, id); + // _redis->del("network-nodes-online:{"+controllerId+"}:"+id); + // } + // } catch (sw::redis::Error &e) { + // fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); + // } + // } } else if (objtype == "_delete_member") { try { std::string memberId = (*config)["id"]; @@ -1595,23 +1596,23 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "ERROR: Error deleting member: %s\n", e.what()); } - if (_rc != NULL) { - try { - std::string memberId = (*config)["id"]; - std::string networkId = (*config)["nwid"]; - std::string controllerId = _myAddressStr.c_str(); - std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId; - if (_rc->clusterMode) { - _cluster->srem(key, memberId); - _cluster->del("member:{"+controllerId+"}:"+networkId+":"+memberId); - } else { - _redis->srem(key, memberId); - _redis->del("member:{"+controllerId+"}:"+networkId+":"+memberId); - } - } catch (sw::redis::Error &e) { - fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what()); - } - } + // if (_rc != NULL) { + // try { + // std::string memberId = (*config)["id"]; + // std::string networkId = (*config)["nwid"]; + // std::string controllerId = _myAddressStr.c_str(); + // std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId; + // if (_rc->clusterMode) { + // _cluster->srem(key, memberId); + // _cluster->del("member:{"+controllerId+"}:"+networkId+":"+memberId); + // } else { + // _redis->srem(key, memberId); + // _redis->del("member:{"+controllerId+"}:"+networkId+":"+memberId); + // } + // } catch (sw::redis::Error &e) { + // fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what()); + // } + // } } else { fprintf(stderr, "ERROR: unknown objtype"); } @@ -1619,7 +1620,7 @@ void PostgreSQL::commitThread() fprintf(stderr, "ERROR: Error getting objtype: %s\n", e.what()); } - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } PQfinish(conn); @@ -1634,11 +1635,11 @@ void PostgreSQL::onlineNotificationThread() { waitForReady(); - if (_rc != NULL) { - onlineNotification_Redis(); - } else { + // if (_rc != NULL) { + // onlineNotification_Redis(); + // } else { onlineNotification_Postgres(); - } + // } } void PostgreSQL::onlineNotification_Postgres() @@ -1783,7 +1784,7 @@ void PostgreSQL::onlineNotification_Redis() fprintf(stderr, "Error in online notification thread (redis): %s\n", e.what()); #endif } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::seconds(10)); } }