diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 4687c9204..e55797fb9 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -34,6 +34,7 @@ static const int DB_MINIMUM_VERSION = 5; static const char *_timestr() { + time_t t = time(0); char *ts = ctime(&t); char *p = ts; @@ -74,10 +75,13 @@ using namespace ZeroTier; MemberNotificationReceiver::MemberNotificationReceiver(PostgreSQL *p, pqxx::connection &c, const std::string &channel) : pqxx::notification_receiver(c, channel) , _psql(p) -{} +{ + fprintf(stderr, "initialize MemberNotificaitonReceiver\n"); +} void MemberNotificationReceiver::operator() (const std::string &payload, int packend_pid) { + fprintf(stderr, "Member Notification received: %s\n", payload.c_str()); json tmp(json::parse(payload)); json &ov = tmp["old_val"]; json &nv = tmp["new_val"]; @@ -86,6 +90,7 @@ void MemberNotificationReceiver::operator() (const std::string &payload, int pac if (nv.is_object()) newConfig = nv; if (oldConfig.is_object() || newConfig.is_object()) { _psql->_memberChanged(oldConfig,newConfig,(_psql->_ready>=2)); + fprintf(stderr, "payload sent\n"); } } @@ -93,10 +98,13 @@ void MemberNotificationReceiver::operator() (const std::string &payload, int pac NetworkNotificationReceiver::NetworkNotificationReceiver(PostgreSQL *p, pqxx::connection &c, const std::string &channel) : pqxx::notification_receiver(c, channel) , _psql(p) -{} +{ + fprintf(stderr, "initialize NetworkNotificationReceiver\n"); +} void NetworkNotificationReceiver::operator() (const std::string &payload, int packend_pid) { - json tmp(json::parse(payload)); + fprintf(stderr, "Network Notificaiton received: %s\n", payload.c_str()); + json tmp(json::parse(payload)); json &ov = tmp["old_val"]; json &nv = tmp["new_val"]; json oldConfig, newConfig; @@ -104,6 +112,7 @@ void NetworkNotificationReceiver::operator() (const std::string &payload, int pa if (nv.is_object()) newConfig = nv; if (oldConfig.is_object() || newConfig.is_object()) { _psql->_networkChanged(oldConfig,newConfig,(_psql->_ready>=2)); + fprintf(stderr, "payload sent\n"); } } @@ -220,6 +229,7 @@ bool PostgreSQL::isReady() bool PostgreSQL::save(nlohmann::json &record,bool notifyListeners) { + fprintf(stderr, "PostgreSQL::save\n"); bool modified = false; try { if (!record.is_object()) @@ -259,6 +269,7 @@ bool PostgreSQL::save(nlohmann::json &record,bool notifyListeners) void PostgreSQL::eraseNetwork(const uint64_t networkId) { + fprintf(stderr, "PostgreSQL::eraseNetwork\n"); char tmp2[24]; waitForReady(); Utils::hex(networkId, tmp2); @@ -273,6 +284,7 @@ void PostgreSQL::eraseNetwork(const uint64_t networkId) void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId) { + fprintf(stderr, "PostgreSQL::eraseMember\n"); char tmp2[24]; waitForReady(); std::pair tmp, nw; @@ -299,23 +311,35 @@ void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, void PostgreSQL::updateMemberOnLoad(const uint64_t networkId, const uint64_t memberId, nlohmann::json &member) { + const uint64_t nwid = OSUtils::jsonIntHex(member["nwid"],0ULL); const uint64_t id = OSUtils::jsonIntHex(member["id"],0ULL); char nwids[24],ids[24]; OSUtils::ztsnprintf(nwids, sizeof(nwids), "%.16llx", nwid); OSUtils::ztsnprintf(ids, sizeof(ids), "%.10llx", id); + fprintf(stderr, "PostgreSQL::updateMemberOnLoad: %s-%s\n", nwids, ids); bool have_auth = false; try { auto c = _pool->borrow(); pqxx::work w(*c->c); - pqxx::row r = w.exec_params1("SELECT org.client_id, org.authorization_endpoint " + pqxx::result r = w.exec_params("SELECT org.client_id, org.authorization_endpoint " "FROM ztc_network AS nw, ztc_org AS org " "WHERE nw.id = $1 AND nw.sso_enabled = true AND org.owner_id = nw.owner_id", nwids); - std::string client_id = r[0].as(); - std::string authorization_endpoint = r[1].as(); + std::string client_id = ""; + std::string authorization_endpoint = ""; + + if (r.size() == 1) { + // only one should exist + pqxx::row row = r.at(0); + client_id = row[0].as(); + authorization_endpoint = row[1].as(); + } else if (r.size() > 1) { + fprintf(stderr, "ERROR: More than one auth endpoint for an organization?!?!? NetworkID: %s\n", nwids); + } + // no catch all else because we don't actually care if no records exist here. just continue as normal. if ((!client_id.empty())&&(!authorization_endpoint.empty())) { pqxx::row r2 = w.exec_params1( @@ -415,7 +439,13 @@ void PostgreSQL::initializeNetworks() } else { config["remoteTraceLevel"] = 0; } - config["remoteTraceTarget"] = row[10].as(); + + if (!row[10].is_null()) { + config["remoteTraceTarget"] = row[10].as(); + } else { + config["remoteTraceTarget"] = nullptr; + } + try { config["revision"] = row[11].as(); } catch (std::exception &e) { @@ -483,6 +513,7 @@ void PostgreSQL::initializeNetworks() } _networkChanged(empty, config, false); + fprintf(stderr, "Initialized Network: %s\n", nwid.c_str()); } w.commit(); @@ -511,16 +542,16 @@ void PostgreSQL::initializeMembers() fprintf(stderr, "Initializing Members...\n"); auto c = _pool->borrow(); pqxx::work w{*c->c}; - pqxx::result r = w.exec( - "SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000, m.identity, " - " EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000, " - " EXTRACT(EPOCH FROM m.last_deauthorized_time AT TIME ZONE 'UTC')*1000, " + pqxx::result r = w.exec_params( + "SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, (EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000)::bigint, m.identity, " + " (EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000)::bigint, " + " (EXTRACT(EPOCH FROM m.last_deauthorized_time AT TIME ZONE 'UTC')*1000)::bigint, " " m.remote_trace_level, m.remote_trace_target, m.tags, m.v_major, m.v_minor, m.v_rev, m.v_proto, " " m.no_auto_assign_ips, m.revision " "FROM ztc_member m " "INNER JOIN ztc_network n " " ON n.id = m.network_id " - "WHERE n.controller_id = $1 AND m.deleted = false"); + "WHERE n.controller_id = $1 AND m.deleted = false", _myAddressStr); for (auto row = r.begin(); row != r.end(); row++) { json empty; @@ -612,6 +643,7 @@ void PostgreSQL::initializeMembers() } _memberChanged(empty, config, false); + fprintf(stderr, "Initiialzed member %s-%s", networkId.c_str(), memberId.c_str()); } w.commit(); @@ -795,6 +827,8 @@ void PostgreSQL::networksDbWatcher() void PostgreSQL::_networksWatcher_Postgres() { std::string stream = "network_" + _myAddressStr; + fprintf(stderr, "Listening to member stream: %s\n", stream.c_str()); + auto c = _pool->borrow(); NetworkNotificationReceiver n(this, *c->c, stream); @@ -865,19 +899,20 @@ void PostgreSQL::_networksWatcher_Redis() { void PostgreSQL::commitThread() { + fprintf(stderr, "commitThread start\n"); std::pair qitem; while(_commitQueue.get(qitem)&(_run == 1)) { + fprintf(stderr, "commitThread tick\n"); if (!qitem.first.is_object()) { + fprintf(stderr, "not an object\n"); continue; } - - - try { nlohmann::json *config = &(qitem.first); const std::string objtype = (*config)["objtype"]; if (objtype == "member") { + fprintf(stderr, "commitThread: member\n"); try { auto c = _pool->borrow(); pqxx::work w(*c->c); @@ -941,6 +976,7 @@ void PostgreSQL::commitThread() assignments.push_back(addr); } if (ipAssignError) { + fprintf(stderr, "ipAssignError\n"); delete config; config = nullptr; continue; @@ -969,6 +1005,7 @@ void PostgreSQL::commitThread() } } else if (objtype == "network") { try { + fprintf(stderr, "commitThread: network\n"); auto c = _pool->borrow(); pqxx::work w(*c->c); @@ -1061,6 +1098,7 @@ void PostgreSQL::commitThread() id, targetAddr, targetBits, (via == "NULL" ? NULL : via.c_str())); } if (err) { + fprintf(stderr, "route add error\n"); w.abort(); _pool->unborrow(c); delete config; @@ -1104,6 +1142,7 @@ void PostgreSQL::commitThread() fprintf(stderr, "ERROR: Error updating member: %s\n", e.what()); } } else if (objtype == "_delete_network") { + fprintf(stderr, "commitThread: delete network\n"); try { auto c = _pool->borrow(); pqxx::work w(*c->c); @@ -1120,6 +1159,7 @@ void PostgreSQL::commitThread() } } else if (objtype == "_delete_member") { + fprintf(stderr, "commitThread: delete member\n"); try { auto c = _pool->borrow(); pqxx::work w(*c->c); @@ -1179,7 +1219,7 @@ void PostgreSQL::onlineNotification_Postgres() // using pqxx::stream_to would be a really nice alternative here, but // unfortunately it doesn't support upserts. - + fprintf(stderr, "online notification tick\n"); std::stringstream memberUpdate; memberUpdate << "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES "; bool firstRun = true; diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp index 63084f862..49fa9a6fa 100644 --- a/controller/PostgreSQL.hpp +++ b/controller/PostgreSQL.hpp @@ -66,7 +66,9 @@ class PostgreSQL; class MemberNotificationReceiver : public pqxx::notification_receiver { public: MemberNotificationReceiver(PostgreSQL *p, pqxx::connection &c, const std::string &channel); - virtual ~MemberNotificationReceiver() {} + virtual ~MemberNotificationReceiver() { + fprintf(stderr, "MemberNotificationReceiver destroyed\n"); + } virtual void operator() (const std::string &payload, int backendPid); private: @@ -76,7 +78,9 @@ private: class NetworkNotificationReceiver : public pqxx::notification_receiver { public: NetworkNotificationReceiver(PostgreSQL *p, pqxx::connection &c, const std::string &channel); - virtual ~NetworkNotificationReceiver() {}; + virtual ~NetworkNotificationReceiver() { + fprintf(stderr, "NetworkNotificationReceiver destroyed\n"); + }; virtual void operator() (const std::string &payload, int packend_pid); private: