bug fixes & debug code

This commit is contained in:
Grant Limberg 2021-06-02 13:46:54 -07:00
parent d2f1d05a06
commit 7427961fcf
No known key found for this signature in database
GPG Key ID: 2BA62CCABBB4095A
2 changed files with 62 additions and 18 deletions

View File

@ -34,6 +34,7 @@ static const int DB_MINIMUM_VERSION = 5;
static const char *_timestr()
{
time_t t = time(0);
char *ts = ctime(&t);
char *p = ts;
@ -74,10 +75,13 @@ using namespace ZeroTier;
MemberNotificationReceiver::MemberNotificationReceiver(PostgreSQL *p, pqxx::connection &c, const std::string &channel)
: pqxx::notification_receiver(c, channel)
, _psql(p)
{}
{
fprintf(stderr, "initialize MemberNotificaitonReceiver\n");
}
void MemberNotificationReceiver::operator() (const std::string &payload, int packend_pid) {
fprintf(stderr, "Member Notification received: %s\n", payload.c_str());
json tmp(json::parse(payload));
json &ov = tmp["old_val"];
json &nv = tmp["new_val"];
@ -86,6 +90,7 @@ void MemberNotificationReceiver::operator() (const std::string &payload, int pac
if (nv.is_object()) newConfig = nv;
if (oldConfig.is_object() || newConfig.is_object()) {
_psql->_memberChanged(oldConfig,newConfig,(_psql->_ready>=2));
fprintf(stderr, "payload sent\n");
}
}
@ -93,10 +98,13 @@ void MemberNotificationReceiver::operator() (const std::string &payload, int pac
NetworkNotificationReceiver::NetworkNotificationReceiver(PostgreSQL *p, pqxx::connection &c, const std::string &channel)
: pqxx::notification_receiver(c, channel)
, _psql(p)
{}
{
fprintf(stderr, "initialize NetworkNotificationReceiver\n");
}
void NetworkNotificationReceiver::operator() (const std::string &payload, int packend_pid) {
json tmp(json::parse(payload));
fprintf(stderr, "Network Notificaiton received: %s\n", payload.c_str());
json tmp(json::parse(payload));
json &ov = tmp["old_val"];
json &nv = tmp["new_val"];
json oldConfig, newConfig;
@ -104,6 +112,7 @@ void NetworkNotificationReceiver::operator() (const std::string &payload, int pa
if (nv.is_object()) newConfig = nv;
if (oldConfig.is_object() || newConfig.is_object()) {
_psql->_networkChanged(oldConfig,newConfig,(_psql->_ready>=2));
fprintf(stderr, "payload sent\n");
}
}
@ -220,6 +229,7 @@ bool PostgreSQL::isReady()
bool PostgreSQL::save(nlohmann::json &record,bool notifyListeners)
{
fprintf(stderr, "PostgreSQL::save\n");
bool modified = false;
try {
if (!record.is_object())
@ -259,6 +269,7 @@ bool PostgreSQL::save(nlohmann::json &record,bool notifyListeners)
void PostgreSQL::eraseNetwork(const uint64_t networkId)
{
fprintf(stderr, "PostgreSQL::eraseNetwork\n");
char tmp2[24];
waitForReady();
Utils::hex(networkId, tmp2);
@ -273,6 +284,7 @@ void PostgreSQL::eraseNetwork(const uint64_t networkId)
void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId)
{
fprintf(stderr, "PostgreSQL::eraseMember\n");
char tmp2[24];
waitForReady();
std::pair<nlohmann::json,bool> tmp, nw;
@ -299,23 +311,35 @@ void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId,
void PostgreSQL::updateMemberOnLoad(const uint64_t networkId, const uint64_t memberId, nlohmann::json &member)
{
const uint64_t nwid = OSUtils::jsonIntHex(member["nwid"],0ULL);
const uint64_t id = OSUtils::jsonIntHex(member["id"],0ULL);
char nwids[24],ids[24];
OSUtils::ztsnprintf(nwids, sizeof(nwids), "%.16llx", nwid);
OSUtils::ztsnprintf(ids, sizeof(ids), "%.10llx", id);
fprintf(stderr, "PostgreSQL::updateMemberOnLoad: %s-%s\n", nwids, ids);
bool have_auth = false;
try {
auto c = _pool->borrow();
pqxx::work w(*c->c);
pqxx::row r = w.exec_params1("SELECT org.client_id, org.authorization_endpoint "
pqxx::result r = w.exec_params("SELECT org.client_id, org.authorization_endpoint "
"FROM ztc_network AS nw, ztc_org AS org "
"WHERE nw.id = $1 AND nw.sso_enabled = true AND org.owner_id = nw.owner_id", nwids);
std::string client_id = r[0].as<std::string>();
std::string authorization_endpoint = r[1].as<std::string>();
std::string client_id = "";
std::string authorization_endpoint = "";
if (r.size() == 1) {
// only one should exist
pqxx::row row = r.at(0);
client_id = row[0].as<std::string>();
authorization_endpoint = row[1].as<std::string>();
} else if (r.size() > 1) {
fprintf(stderr, "ERROR: More than one auth endpoint for an organization?!?!? NetworkID: %s\n", nwids);
}
// no catch all else because we don't actually care if no records exist here. just continue as normal.
if ((!client_id.empty())&&(!authorization_endpoint.empty())) {
pqxx::row r2 = w.exec_params1(
@ -415,7 +439,13 @@ void PostgreSQL::initializeNetworks()
} else {
config["remoteTraceLevel"] = 0;
}
config["remoteTraceTarget"] = row[10].as<std::string>();
if (!row[10].is_null()) {
config["remoteTraceTarget"] = row[10].as<std::string>();
} else {
config["remoteTraceTarget"] = nullptr;
}
try {
config["revision"] = row[11].as<uint64_t>();
} catch (std::exception &e) {
@ -483,6 +513,7 @@ void PostgreSQL::initializeNetworks()
}
_networkChanged(empty, config, false);
fprintf(stderr, "Initialized Network: %s\n", nwid.c_str());
}
w.commit();
@ -511,16 +542,16 @@ void PostgreSQL::initializeMembers()
fprintf(stderr, "Initializing Members...\n");
auto c = _pool->borrow();
pqxx::work w{*c->c};
pqxx::result r = w.exec(
"SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000, m.identity, "
" EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000, "
" EXTRACT(EPOCH FROM m.last_deauthorized_time AT TIME ZONE 'UTC')*1000, "
pqxx::result r = w.exec_params(
"SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, (EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000)::bigint, m.identity, "
" (EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000)::bigint, "
" (EXTRACT(EPOCH FROM m.last_deauthorized_time AT TIME ZONE 'UTC')*1000)::bigint, "
" m.remote_trace_level, m.remote_trace_target, m.tags, m.v_major, m.v_minor, m.v_rev, m.v_proto, "
" m.no_auto_assign_ips, m.revision "
"FROM ztc_member m "
"INNER JOIN ztc_network n "
" ON n.id = m.network_id "
"WHERE n.controller_id = $1 AND m.deleted = false");
"WHERE n.controller_id = $1 AND m.deleted = false", _myAddressStr);
for (auto row = r.begin(); row != r.end(); row++) {
json empty;
@ -612,6 +643,7 @@ void PostgreSQL::initializeMembers()
}
_memberChanged(empty, config, false);
fprintf(stderr, "Initiialzed member %s-%s", networkId.c_str(), memberId.c_str());
}
w.commit();
@ -795,6 +827,8 @@ void PostgreSQL::networksDbWatcher()
void PostgreSQL::_networksWatcher_Postgres() {
std::string stream = "network_" + _myAddressStr;
fprintf(stderr, "Listening to member stream: %s\n", stream.c_str());
auto c = _pool->borrow();
NetworkNotificationReceiver n(this, *c->c, stream);
@ -865,19 +899,20 @@ void PostgreSQL::_networksWatcher_Redis() {
void PostgreSQL::commitThread()
{
fprintf(stderr, "commitThread start\n");
std::pair<nlohmann::json,bool> qitem;
while(_commitQueue.get(qitem)&(_run == 1)) {
fprintf(stderr, "commitThread tick\n");
if (!qitem.first.is_object()) {
fprintf(stderr, "not an object\n");
continue;
}
try {
nlohmann::json *config = &(qitem.first);
const std::string objtype = (*config)["objtype"];
if (objtype == "member") {
fprintf(stderr, "commitThread: member\n");
try {
auto c = _pool->borrow();
pqxx::work w(*c->c);
@ -941,6 +976,7 @@ void PostgreSQL::commitThread()
assignments.push_back(addr);
}
if (ipAssignError) {
fprintf(stderr, "ipAssignError\n");
delete config;
config = nullptr;
continue;
@ -969,6 +1005,7 @@ void PostgreSQL::commitThread()
}
} else if (objtype == "network") {
try {
fprintf(stderr, "commitThread: network\n");
auto c = _pool->borrow();
pqxx::work w(*c->c);
@ -1061,6 +1098,7 @@ void PostgreSQL::commitThread()
id, targetAddr, targetBits, (via == "NULL" ? NULL : via.c_str()));
}
if (err) {
fprintf(stderr, "route add error\n");
w.abort();
_pool->unborrow(c);
delete config;
@ -1104,6 +1142,7 @@ void PostgreSQL::commitThread()
fprintf(stderr, "ERROR: Error updating member: %s\n", e.what());
}
} else if (objtype == "_delete_network") {
fprintf(stderr, "commitThread: delete network\n");
try {
auto c = _pool->borrow();
pqxx::work w(*c->c);
@ -1120,6 +1159,7 @@ void PostgreSQL::commitThread()
}
} else if (objtype == "_delete_member") {
fprintf(stderr, "commitThread: delete member\n");
try {
auto c = _pool->borrow();
pqxx::work w(*c->c);
@ -1179,7 +1219,7 @@ void PostgreSQL::onlineNotification_Postgres()
// using pqxx::stream_to would be a really nice alternative here, but
// unfortunately it doesn't support upserts.
fprintf(stderr, "online notification tick\n");
std::stringstream memberUpdate;
memberUpdate << "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES ";
bool firstRun = true;

View File

@ -66,7 +66,9 @@ class PostgreSQL;
class MemberNotificationReceiver : public pqxx::notification_receiver {
public:
MemberNotificationReceiver(PostgreSQL *p, pqxx::connection &c, const std::string &channel);
virtual ~MemberNotificationReceiver() {}
virtual ~MemberNotificationReceiver() {
fprintf(stderr, "MemberNotificationReceiver destroyed\n");
}
virtual void operator() (const std::string &payload, int backendPid);
private:
@ -76,7 +78,9 @@ private:
class NetworkNotificationReceiver : public pqxx::notification_receiver {
public:
NetworkNotificationReceiver(PostgreSQL *p, pqxx::connection &c, const std::string &channel);
virtual ~NetworkNotificationReceiver() {};
virtual ~NetworkNotificationReceiver() {
fprintf(stderr, "NetworkNotificationReceiver destroyed\n");
};
virtual void operator() (const std::string &payload, int packend_pid);
private: