This commit is contained in:
Adam Ierymenko 2020-08-25 08:07:23 -07:00
commit 47e9fb3ddb
3 changed files with 147 additions and 139 deletions

View File

@ -273,18 +273,18 @@ void PostgreSQL::initializeNetworks(PGconn *conn)
std::string setKey = "networks:{" + _myAddressStr + "}";
if (_rc != NULL) {
try {
if (_rc->clusterMode) {
_cluster->del(setKey);
} else {
_redis->del(setKey);
}
} catch (sw::redis::Error &e) {
// del can throw an error if the key doesn't exist
// swallow it and move along
}
}
// if (_rc != NULL) {
// try {
// if (_rc->clusterMode) {
// _cluster->del(setKey);
// } else {
// _redis->del(setKey);
// }
// } catch (sw::redis::Error &e) {
// // del can throw an error if the key doesn't exist
// // swallow it and move along
// }
// }
std::unordered_set<std::string> networkSet;
@ -475,17 +475,17 @@ void PostgreSQL::initializeNetworks(PGconn *conn)
PQclear(res);
if(!networkSet.empty()) {
if (_rc && _rc->clusterMode) {
auto tx = _cluster->transaction(_myAddressStr, true);
tx.sadd(setKey, networkSet.begin(), networkSet.end());
tx.exec();
} else if (_rc && !_rc->clusterMode) {
auto tx = _redis->transaction(true);
tx.sadd(setKey, networkSet.begin(), networkSet.end());
tx.exec();
}
}
// if(!networkSet.empty()) {
// if (_rc && _rc->clusterMode) {
// auto tx = _cluster->transaction(_myAddressStr, true);
// tx.sadd(setKey, networkSet.begin(), networkSet.end());
// tx.exec();
// } else if (_rc && !_rc->clusterMode) {
// auto tx = _redis->transaction(true);
// tx.sadd(setKey, networkSet.begin(), networkSet.end());
// tx.exec();
// }
// }
if (++this->_ready == 2) {
if (_waitNoticePrinted) {
@ -509,36 +509,36 @@ void PostgreSQL::initializeMembers(PGconn *conn)
fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn));
exit(1);
}
std::string setKeyBase = "network-nodes-all:{" + _myAddressStr + "}:";
// std::string setKeyBase = "network-nodes-all:{" + _myAddressStr + "}:";
if (_rc != NULL) {
std::lock_guard<std::mutex> l(_networks_l);
std::unordered_set<std::string> deletes;
for ( auto it : _networks) {
uint64_t nwid_i = it.first;
char nwidTmp[64] = {0};
OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i);
std::string nwid(nwidTmp);
std::string key = setKeyBase + nwid;
deletes.insert(key);
}
// if (_rc != NULL) {
// std::lock_guard<std::mutex> l(_networks_l);
// std::unordered_set<std::string> deletes;
// for ( auto it : _networks) {
// uint64_t nwid_i = it.first;
// char nwidTmp[64] = {0};
// OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i);
// std::string nwid(nwidTmp);
// std::string key = setKeyBase + nwid;
// deletes.insert(key);
// }
if (!deletes.empty()) {
if (_rc->clusterMode) {
auto tx = _cluster->transaction(_myAddressStr, true);
for (std::string k : deletes) {
tx.del(k);
}
tx.exec();
} else {
auto tx = _redis->transaction(true);
for (std::string k : deletes) {
tx.del(k);
}
tx.exec();
}
}
}
// if (!deletes.empty()) {
// if (_rc->clusterMode) {
// auto tx = _cluster->transaction(_myAddressStr, true);
// for (std::string k : deletes) {
// tx.del(k);
// }
// tx.exec();
// } else {
// auto tx = _redis->transaction(true);
// for (std::string k : deletes) {
// tx.del(k);
// }
// tx.exec();
// }
// }
// }
const char *params[1] = {
_myAddressStr.c_str()
@ -578,7 +578,7 @@ void PostgreSQL::initializeMembers(PGconn *conn)
std::string memberId(PQgetvalue(res, i, 0));
std::string networkId(PQgetvalue(res, i, 1));
networkMembers.insert(std::pair<std::string, std::string>(setKeyBase+networkId, memberId));
// networkMembers.insert(std::pair<std::string, std::string>(setKeyBase+networkId, memberId));
std::string ctime = PQgetvalue(res, i, 5);
config["id"] = memberId;
@ -685,23 +685,23 @@ void PostgreSQL::initializeMembers(PGconn *conn)
PQclear(res);
if (!networkMembers.empty()) {
if (_rc != NULL) {
if (_rc->clusterMode) {
auto tx = _cluster->transaction(_myAddressStr, true);
for (auto it : networkMembers) {
tx.sadd(it.first, it.second);
}
tx.exec();
} else {
auto tx = _redis->transaction(true);
for (auto it : networkMembers) {
tx.sadd(it.first, it.second);
}
tx.exec();
}
}
}
// if (!networkMembers.empty()) {
// if (_rc != NULL) {
// if (_rc->clusterMode) {
// auto tx = _cluster->transaction(_myAddressStr, true);
// for (auto it : networkMembers) {
// tx.sadd(it.first, it.second);
// }
// tx.exec();
// } else {
// auto tx = _redis->transaction(true);
// for (auto it : networkMembers) {
// tx.sadd(it.first, it.second);
// }
// tx.exec();
// }
// }
// }
if (++this->_ready == 2) {
if (_waitNoticePrinted) {
fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt());
@ -755,7 +755,7 @@ void PostgreSQL::heartbeat()
std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD);
std::string now = std::to_string(ts);
std::string host_port = std::to_string(_listenPort);
std::string use_redis = (_rc != NULL) ? "true" : "false";
std::string use_redis = "false"; // (_rc != NULL) ? "true" : "false";
const char *values[10] = {
controllerId,
hostname,
@ -788,13 +788,13 @@ void PostgreSQL::heartbeat()
}
PQclear(res);
}
if (_rc != NULL) {
if (_rc->clusterMode) {
_cluster->zadd("controllers", controllerId, ts);
} else {
_redis->zadd("controllers", controllerId, ts);
}
}
// if (_rc != NULL) {
// if (_rc->clusterMode) {
// _cluster->zadd("controllers", controllerId, ts);
// } else {
// _redis->zadd("controllers", controllerId, ts);
// }
// }
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
@ -833,6 +833,7 @@ void PostgreSQL::membersDbWatcher()
void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) {
char buf[11] = {0};
std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf));
fprintf(stderr, "Listening to member stream: %s\n", cmd.c_str());
PGresult *res = PQexec(conn, cmd.c_str());
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res));
@ -874,7 +875,7 @@ void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) {
void PostgreSQL::_membersWatcher_Redis() {
char buf[11] = {0};
std::string key = "member-stream:{" + std::string(_myAddress.toString(buf)) + "}";
fprintf(stderr, "Listening to member stream: %s\n", key.c_str());
while (_run == 1) {
try {
json tmp;
@ -1515,20 +1516,20 @@ void PostgreSQL::commitThread()
} catch (std::exception &e) {
fprintf(stderr, "ERROR: Error updating member: %s\n", e.what());
}
if (_rc != NULL) {
try {
std::string id = (*config)["id"];
std::string controllerId = _myAddressStr.c_str();
std::string key = "networks:{" + controllerId + "}";
if (_rc->clusterMode) {
_cluster->sadd(key, id);
} else {
_redis->sadd(key, id);
}
} catch (sw::redis::Error &e) {
fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what());
}
}
// if (_rc != NULL) {
// try {
// std::string id = (*config)["id"];
// std::string controllerId = _myAddressStr.c_str();
// std::string key = "networks:{" + controllerId + "}";
// if (_rc->clusterMode) {
// _cluster->sadd(key, id);
// } else {
// _redis->sadd(key, id);
// }
// } catch (sw::redis::Error &e) {
// fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what());
// }
// }
} else if (objtype == "_delete_network") {
try {
std::string networkId = (*config)["nwid"];
@ -1552,22 +1553,22 @@ void PostgreSQL::commitThread()
} catch (std::exception &e) {
fprintf(stderr, "ERROR: Error deleting network: %s\n", e.what());
}
if (_rc != NULL) {
try {
std::string id = (*config)["id"];
std::string controllerId = _myAddressStr.c_str();
std::string key = "networks:{" + controllerId + "}";
if (_rc->clusterMode) {
_cluster->srem(key, id);
_cluster->del("network-nodes-online:{"+controllerId+"}:"+id);
} else {
_redis->srem(key, id);
_redis->del("network-nodes-online:{"+controllerId+"}:"+id);
}
} catch (sw::redis::Error &e) {
fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what());
}
}
// if (_rc != NULL) {
// try {
// std::string id = (*config)["id"];
// std::string controllerId = _myAddressStr.c_str();
// std::string key = "networks:{" + controllerId + "}";
// if (_rc->clusterMode) {
// _cluster->srem(key, id);
// _cluster->del("network-nodes-online:{"+controllerId+"}:"+id);
// } else {
// _redis->srem(key, id);
// _redis->del("network-nodes-online:{"+controllerId+"}:"+id);
// }
// } catch (sw::redis::Error &e) {
// fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what());
// }
// }
} else if (objtype == "_delete_member") {
try {
std::string memberId = (*config)["id"];
@ -1595,23 +1596,23 @@ void PostgreSQL::commitThread()
} catch (std::exception &e) {
fprintf(stderr, "ERROR: Error deleting member: %s\n", e.what());
}
if (_rc != NULL) {
try {
std::string memberId = (*config)["id"];
std::string networkId = (*config)["nwid"];
std::string controllerId = _myAddressStr.c_str();
std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId;
if (_rc->clusterMode) {
_cluster->srem(key, memberId);
_cluster->del("member:{"+controllerId+"}:"+networkId+":"+memberId);
} else {
_redis->srem(key, memberId);
_redis->del("member:{"+controllerId+"}:"+networkId+":"+memberId);
}
} catch (sw::redis::Error &e) {
fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what());
}
}
// if (_rc != NULL) {
// try {
// std::string memberId = (*config)["id"];
// std::string networkId = (*config)["nwid"];
// std::string controllerId = _myAddressStr.c_str();
// std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId;
// if (_rc->clusterMode) {
// _cluster->srem(key, memberId);
// _cluster->del("member:{"+controllerId+"}:"+networkId+":"+memberId);
// } else {
// _redis->srem(key, memberId);
// _redis->del("member:{"+controllerId+"}:"+networkId+":"+memberId);
// }
// } catch (sw::redis::Error &e) {
// fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what());
// }
// }
} else {
fprintf(stderr, "ERROR: unknown objtype");
}
@ -1619,7 +1620,7 @@ void PostgreSQL::commitThread()
fprintf(stderr, "ERROR: Error getting objtype: %s\n", e.what());
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
PQfinish(conn);
@ -1634,11 +1635,11 @@ void PostgreSQL::onlineNotificationThread()
{
waitForReady();
if (_rc != NULL) {
onlineNotification_Redis();
} else {
// if (_rc != NULL) {
// onlineNotification_Redis();
// } else {
onlineNotification_Postgres();
}
// }
}
void PostgreSQL::onlineNotification_Postgres()
@ -1783,7 +1784,7 @@ void PostgreSQL::onlineNotification_Redis()
fprintf(stderr, "Error in online notification thread (redis): %s\n", e.what());
#endif
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::this_thread::sleep_for(std::chrono::seconds(10));
}
}

View File

@ -87,7 +87,9 @@ public:
std::string policyAlias() { return _policyAlias; }
/**
* Inform the bond about the path that its peer (owning object) just learned about
* Inform the bond about the path that its peer (owning object) just learned about.
* If the path is allowed to be used, it will be inducted into the bond on a trial
* period where link statistics will be collected to judge its quality.
*
* @param path Newly-learned Path which should now be handled by the Bond
* @param now Current time

View File

@ -1630,7 +1630,6 @@ public:
// Custom Policies
json &customBondingPolicies = settings["policies"];
for (json::iterator policyItr = customBondingPolicies.begin(); policyItr != customBondingPolicies.end();++policyItr) {
//fprintf(stderr, "\n\n--- (%s)\n", policyItr.key().c_str());
// Custom Policy
std::string customPolicyStr(policyItr.key());
json &customPolicy = policyItr.value();
@ -1684,7 +1683,6 @@ public:
// Policy-Specific link set
json &links = customPolicy["links"];
for (json::iterator linkItr = links.begin(); linkItr != links.end();++linkItr) {
//fprintf(stderr, "\t--- link (%s)\n", linkItr.key().c_str());
std::string linkNameStr(linkItr.key());
json &link = linkItr.value();
@ -1719,12 +1717,19 @@ public:
}
_node->bondController()->addCustomLink(customPolicyStr, new Link(linkNameStr,ipvPref,speed,linkMonitorInterval,upDelay,downDelay,enabled,linkMode,failoverToStr,alloc));
}
// TODO: This is dumb
std::string linkSelectMethodStr(OSUtils::jsonString(customPolicy["activeReselect"],"optimize"));
if (linkSelectMethodStr == "always") { newTemplateBond->setLinkSelectMethod(ZT_MULTIPATH_RESELECTION_POLICY_ALWAYS); }
if (linkSelectMethodStr == "better") { newTemplateBond->setLinkSelectMethod(ZT_MULTIPATH_RESELECTION_POLICY_BETTER); }
if (linkSelectMethodStr == "failure") { newTemplateBond->setLinkSelectMethod(ZT_MULTIPATH_RESELECTION_POLICY_FAILURE); }
if (linkSelectMethodStr == "optimize") { newTemplateBond->setLinkSelectMethod(ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE); }
if (linkSelectMethodStr == "always") {
newTemplateBond->setLinkSelectMethod(ZT_MULTIPATH_RESELECTION_POLICY_ALWAYS);
}
if (linkSelectMethodStr == "better") {
newTemplateBond->setLinkSelectMethod(ZT_MULTIPATH_RESELECTION_POLICY_BETTER);
}
if (linkSelectMethodStr == "failure") {
newTemplateBond->setLinkSelectMethod(ZT_MULTIPATH_RESELECTION_POLICY_FAILURE);
}
if (linkSelectMethodStr == "optimize") {
newTemplateBond->setLinkSelectMethod(ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE);
}
if (newTemplateBond->getLinkSelectMethod() < 0 || newTemplateBond->getLinkSelectMethod() > 3) {
fprintf(stderr, "warning: invalid value (%s) for linkSelectMethod, assuming mode: always\n", linkSelectMethodStr.c_str());
}