revert redis for member status

This commit is contained in:
Grant Limberg 2020-07-23 09:38:50 -07:00
parent d0aacfddb7
commit 251b06d812
No known key found for this signature in database
GPG Key ID: 2BA62CCABBB4095A

View File

@ -273,18 +273,18 @@ void PostgreSQL::initializeNetworks(PGconn *conn)
std::string setKey = "networks:{" + _myAddressStr + "}"; std::string setKey = "networks:{" + _myAddressStr + "}";
if (_rc != NULL) { // if (_rc != NULL) {
try { // try {
if (_rc->clusterMode) { // if (_rc->clusterMode) {
_cluster->del(setKey); // _cluster->del(setKey);
} else { // } else {
_redis->del(setKey); // _redis->del(setKey);
} // }
} catch (sw::redis::Error &e) { // } catch (sw::redis::Error &e) {
// del can throw an error if the key doesn't exist // // del can throw an error if the key doesn't exist
// swallow it and move along // // swallow it and move along
} // }
} // }
std::unordered_set<std::string> networkSet; std::unordered_set<std::string> networkSet;
@ -437,17 +437,17 @@ void PostgreSQL::initializeNetworks(PGconn *conn)
PQclear(res); PQclear(res);
if(!networkSet.empty()) { // if(!networkSet.empty()) {
if (_rc && _rc->clusterMode) { // if (_rc && _rc->clusterMode) {
auto tx = _cluster->transaction(_myAddressStr, true); // auto tx = _cluster->transaction(_myAddressStr, true);
tx.sadd(setKey, networkSet.begin(), networkSet.end()); // tx.sadd(setKey, networkSet.begin(), networkSet.end());
tx.exec(); // tx.exec();
} else if (_rc && !_rc->clusterMode) { // } else if (_rc && !_rc->clusterMode) {
auto tx = _redis->transaction(true); // auto tx = _redis->transaction(true);
tx.sadd(setKey, networkSet.begin(), networkSet.end()); // tx.sadd(setKey, networkSet.begin(), networkSet.end());
tx.exec(); // tx.exec();
} // }
} // }
if (++this->_ready == 2) { if (++this->_ready == 2) {
if (_waitNoticePrinted) { if (_waitNoticePrinted) {
@ -471,36 +471,36 @@ void PostgreSQL::initializeMembers(PGconn *conn)
fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn)); fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn));
exit(1); exit(1);
} }
std::string setKeyBase = "network-nodes-all:{" + _myAddressStr + "}:"; // std::string setKeyBase = "network-nodes-all:{" + _myAddressStr + "}:";
if (_rc != NULL) { // if (_rc != NULL) {
std::lock_guard<std::mutex> l(_networks_l); // std::lock_guard<std::mutex> l(_networks_l);
std::unordered_set<std::string> deletes; // std::unordered_set<std::string> deletes;
for ( auto it : _networks) { // for ( auto it : _networks) {
uint64_t nwid_i = it.first; // uint64_t nwid_i = it.first;
char nwidTmp[64] = {0}; // char nwidTmp[64] = {0};
OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i); // OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i);
std::string nwid(nwidTmp); // std::string nwid(nwidTmp);
std::string key = setKeyBase + nwid; // std::string key = setKeyBase + nwid;
deletes.insert(key); // deletes.insert(key);
} // }
if (!deletes.empty()) { // if (!deletes.empty()) {
if (_rc->clusterMode) { // if (_rc->clusterMode) {
auto tx = _cluster->transaction(_myAddressStr, true); // auto tx = _cluster->transaction(_myAddressStr, true);
for (std::string k : deletes) { // for (std::string k : deletes) {
tx.del(k); // tx.del(k);
} // }
tx.exec(); // tx.exec();
} else { // } else {
auto tx = _redis->transaction(true); // auto tx = _redis->transaction(true);
for (std::string k : deletes) { // for (std::string k : deletes) {
tx.del(k); // tx.del(k);
} // }
tx.exec(); // tx.exec();
} // }
} // }
} // }
const char *params[1] = { const char *params[1] = {
_myAddressStr.c_str() _myAddressStr.c_str()
@ -540,7 +540,7 @@ void PostgreSQL::initializeMembers(PGconn *conn)
std::string memberId(PQgetvalue(res, i, 0)); std::string memberId(PQgetvalue(res, i, 0));
std::string networkId(PQgetvalue(res, i, 1)); std::string networkId(PQgetvalue(res, i, 1));
networkMembers.insert(std::pair<std::string, std::string>(setKeyBase+networkId, memberId)); // networkMembers.insert(std::pair<std::string, std::string>(setKeyBase+networkId, memberId));
std::string ctime = PQgetvalue(res, i, 5); std::string ctime = PQgetvalue(res, i, 5);
config["id"] = memberId; config["id"] = memberId;
@ -647,23 +647,23 @@ void PostgreSQL::initializeMembers(PGconn *conn)
PQclear(res); PQclear(res);
if (!networkMembers.empty()) { // if (!networkMembers.empty()) {
if (_rc != NULL) { // if (_rc != NULL) {
if (_rc->clusterMode) { // if (_rc->clusterMode) {
auto tx = _cluster->transaction(_myAddressStr, true); // auto tx = _cluster->transaction(_myAddressStr, true);
for (auto it : networkMembers) { // for (auto it : networkMembers) {
tx.sadd(it.first, it.second); // tx.sadd(it.first, it.second);
} // }
tx.exec(); // tx.exec();
} else { // } else {
auto tx = _redis->transaction(true); // auto tx = _redis->transaction(true);
for (auto it : networkMembers) { // for (auto it : networkMembers) {
tx.sadd(it.first, it.second); // tx.sadd(it.first, it.second);
} // }
tx.exec(); // tx.exec();
} // }
} // }
} // }
if (++this->_ready == 2) { if (++this->_ready == 2) {
if (_waitNoticePrinted) { if (_waitNoticePrinted) {
fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt()); fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt());
@ -717,7 +717,7 @@ void PostgreSQL::heartbeat()
std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD); std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD);
std::string now = std::to_string(ts); std::string now = std::to_string(ts);
std::string host_port = std::to_string(_listenPort); std::string host_port = std::to_string(_listenPort);
std::string use_redis = (_rc != NULL) ? "true" : "false"; std::string use_redis = "false"; // (_rc != NULL) ? "true" : "false";
const char *values[10] = { const char *values[10] = {
controllerId, controllerId,
hostname, hostname,
@ -750,13 +750,13 @@ void PostgreSQL::heartbeat()
} }
PQclear(res); PQclear(res);
} }
if (_rc != NULL) { // if (_rc != NULL) {
if (_rc->clusterMode) { // if (_rc->clusterMode) {
_cluster->zadd("controllers", controllerId, ts); // _cluster->zadd("controllers", controllerId, ts);
} else { // } else {
_redis->zadd("controllers", controllerId, ts); // _redis->zadd("controllers", controllerId, ts);
} // }
} // }
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); std::this_thread::sleep_for(std::chrono::milliseconds(1000));
} }
@ -1445,20 +1445,20 @@ void PostgreSQL::commitThread()
} catch (std::exception &e) { } catch (std::exception &e) {
fprintf(stderr, "ERROR: Error updating member: %s\n", e.what()); fprintf(stderr, "ERROR: Error updating member: %s\n", e.what());
} }
if (_rc != NULL) { // if (_rc != NULL) {
try { // try {
std::string id = (*config)["id"]; // std::string id = (*config)["id"];
std::string controllerId = _myAddressStr.c_str(); // std::string controllerId = _myAddressStr.c_str();
std::string key = "networks:{" + controllerId + "}"; // std::string key = "networks:{" + controllerId + "}";
if (_rc->clusterMode) { // if (_rc->clusterMode) {
_cluster->sadd(key, id); // _cluster->sadd(key, id);
} else { // } else {
_redis->sadd(key, id); // _redis->sadd(key, id);
} // }
} catch (sw::redis::Error &e) { // } catch (sw::redis::Error &e) {
fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); // fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what());
} // }
} // }
} else if (objtype == "_delete_network") { } else if (objtype == "_delete_network") {
try { try {
std::string networkId = (*config)["nwid"]; std::string networkId = (*config)["nwid"];
@ -1482,22 +1482,22 @@ void PostgreSQL::commitThread()
} catch (std::exception &e) { } catch (std::exception &e) {
fprintf(stderr, "ERROR: Error deleting network: %s\n", e.what()); fprintf(stderr, "ERROR: Error deleting network: %s\n", e.what());
} }
if (_rc != NULL) { // if (_rc != NULL) {
try { // try {
std::string id = (*config)["id"]; // std::string id = (*config)["id"];
std::string controllerId = _myAddressStr.c_str(); // std::string controllerId = _myAddressStr.c_str();
std::string key = "networks:{" + controllerId + "}"; // std::string key = "networks:{" + controllerId + "}";
if (_rc->clusterMode) { // if (_rc->clusterMode) {
_cluster->srem(key, id); // _cluster->srem(key, id);
_cluster->del("network-nodes-online:{"+controllerId+"}:"+id); // _cluster->del("network-nodes-online:{"+controllerId+"}:"+id);
} else { // } else {
_redis->srem(key, id); // _redis->srem(key, id);
_redis->del("network-nodes-online:{"+controllerId+"}:"+id); // _redis->del("network-nodes-online:{"+controllerId+"}:"+id);
} // }
} catch (sw::redis::Error &e) { // } catch (sw::redis::Error &e) {
fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); // fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what());
} // }
} // }
} else if (objtype == "_delete_member") { } else if (objtype == "_delete_member") {
try { try {
std::string memberId = (*config)["id"]; std::string memberId = (*config)["id"];
@ -1525,23 +1525,23 @@ void PostgreSQL::commitThread()
} catch (std::exception &e) { } catch (std::exception &e) {
fprintf(stderr, "ERROR: Error deleting member: %s\n", e.what()); fprintf(stderr, "ERROR: Error deleting member: %s\n", e.what());
} }
if (_rc != NULL) { // if (_rc != NULL) {
try { // try {
std::string memberId = (*config)["id"]; // std::string memberId = (*config)["id"];
std::string networkId = (*config)["nwid"]; // std::string networkId = (*config)["nwid"];
std::string controllerId = _myAddressStr.c_str(); // std::string controllerId = _myAddressStr.c_str();
std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId; // std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId;
if (_rc->clusterMode) { // if (_rc->clusterMode) {
_cluster->srem(key, memberId); // _cluster->srem(key, memberId);
_cluster->del("member:{"+controllerId+"}:"+networkId+":"+memberId); // _cluster->del("member:{"+controllerId+"}:"+networkId+":"+memberId);
} else { // } else {
_redis->srem(key, memberId); // _redis->srem(key, memberId);
_redis->del("member:{"+controllerId+"}:"+networkId+":"+memberId); // _redis->del("member:{"+controllerId+"}:"+networkId+":"+memberId);
} // }
} catch (sw::redis::Error &e) { // } catch (sw::redis::Error &e) {
fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what()); // fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what());
} // }
} // }
} else { } else {
fprintf(stderr, "ERROR: unknown objtype"); fprintf(stderr, "ERROR: unknown objtype");
} }
@ -1549,7 +1549,7 @@ void PostgreSQL::commitThread()
fprintf(stderr, "ERROR: Error getting objtype: %s\n", e.what()); fprintf(stderr, "ERROR: Error getting objtype: %s\n", e.what());
} }
std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::this_thread::sleep_for(std::chrono::milliseconds(100));
} }
PQfinish(conn); PQfinish(conn);
@ -1564,11 +1564,11 @@ void PostgreSQL::onlineNotificationThread()
{ {
waitForReady(); waitForReady();
if (_rc != NULL) { // if (_rc != NULL) {
onlineNotification_Redis(); // onlineNotification_Redis();
} else { // } else {
onlineNotification_Postgres(); onlineNotification_Postgres();
} // }
} }
void PostgreSQL::onlineNotification_Postgres() void PostgreSQL::onlineNotification_Postgres()