diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index ea04f5519..1b97042c4 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -22,7 +22,7 @@ #include "EmbeddedNetworkController.hpp" #include "../version.h" -#include +#include #include using json = nlohmann::json; @@ -62,6 +62,67 @@ std::string join(const std::vector &elements, const char * const se } +namespace ZeroTier { + +class _MemberNotificationReceiver : public pqxx::notification_receiver +{ +private: + ZeroTier::PostgreSQL *_pgsql; + +public: + _MemberNotificationReceiver(pqxx::connection_base &c, const std::string &channel, ZeroTier::PostgreSQL *pgsql) + : pqxx::notification_receiver(c, channel) + , _pgsql(pgsql) + {} + + virtual void operator()(const std::string &payload, int backend_pid) + { + try { + json tmp(json::parse(payload)); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object() || newConfig.is_object()) { + _pgsql->_memberChanged(oldConfig,newConfig,_pgsql->isReady()); + } + } catch (std::exception &e) { + fprintf(stderr, "Exception parsing member notification: %s\n", e.what()); + } + } +}; + +class _NetworkNotificationReceiver : public pqxx::notification_receiver +{ +private: + ZeroTier::PostgreSQL *_pgsql; +public: + _NetworkNotificationReceiver(pqxx::connection_base &c, const std::string &channel, ZeroTier::PostgreSQL *pgsql) + : pqxx::notification_receiver(c, channel) + , _pgsql(pgsql) + {} + + virtual void operator()(const std::string &payload, int backend_pid) + { + try { + json tmp(json::parse(payload)); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object()||newConfig.is_object()) { + _pgsql->_networkChanged(oldConfig,newConfig,_pgsql->isReady()); + } + } catch (std::exception &e) { + fprintf(stderr, "Exception parsing member notification: %s\n", e.what()); + } // ignore bad records + } +}; + +} // namespace ZeroTier + using namespace ZeroTier; PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path) @@ -71,6 +132,7 @@ PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId , _run(1) , _waitNoticePrinted(false) { + fprintf(stderr, "PostgreSQL Constructed"); _connString = std::string(path); _readyLock.lock(); @@ -173,157 +235,101 @@ void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, } } -void PostgreSQL::initializeNetworks(PGconn *conn) +void PostgreSQL::initializeNetworks(pqxx::connection &conn) { try { - if (PQstatus(conn) != CONNECTION_OK) { - fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn)); + if (!conn.is_open()) { + fprintf(stderr, "Bad Database Connection in initializeNetworks\n"); exit(1); } - const char *params[1] = { - _myAddressStr.c_str() - }; + pqxx::work w(conn); - PGresult *res = PQexecParams(conn, "SELECT id, EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000, capabilities, " - "enable_broadcast, EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000, mtu, multicast_limit, name, private, remote_trace_level, " + pqxx::result r = w.exec("SELECT id, EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000 AS creation_time, capabilities, " + "enable_broadcast, EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000 AS last_modified, mtu, multicast_limit, name, private, remote_trace_level, " "remote_trace_target, revision, rules, tags, v4_assign_mode, v6_assign_mode FROM ztc_network " - "WHERE deleted = false AND controller_id = $1", - 1, - NULL, - params, - NULL, - NULL, - 0); - - if (PQresultStatus(res) != PGRES_TUPLES_OK) { - fprintf(stderr, "Networks Initialization Failed: %s", PQerrorMessage(conn)); - PQclear(res); - exit(1); - } + "WHERE deleted = false AND controller_id = '" + w.esc(_myAddressStr) + "'"); - int numRows = PQntuples(res); - for (int i = 0; i < numRows; ++i) { + + for (pqxx::result::const_iterator row = r.begin(); row != r.end(); ++row) { json empty; json config; - const char *nwidparam[1] = { - PQgetvalue(res, i, 0) - }; - - config["id"] = PQgetvalue(res, i, 0); - config["nwid"] = PQgetvalue(res, i, 0); + std::string nwid = row["id"].as(); + config["id"] = nwid; + config["nwid"] = nwid; try { - config["creationTime"] = std::stoull(PQgetvalue(res, i, 1)); - } catch (std::exception &e) { + config["creationTime"] = row["creation_time"].as(); + } catch(std::exception &e) { config["creationTime"] = 0ULL; - //fprintf(stderr, "Error converting creation time: %s\n", PQgetvalue(res, i, 1)); } - config["capabilities"] = json::parse(PQgetvalue(res, i, 2)); - config["enableBroadcast"] = (strcmp(PQgetvalue(res, i, 3),"t")==0); + config["capabilities"] = json::parse(row["capabilities"].as()); + config["enableBroadcast"] = row["enable_broadcast"].as(); try { - config["lastModified"] = std::stoull(PQgetvalue(res, i, 4)); + config["lastModified"] = row["last_modified"].as(); } catch (std::exception &e) { config["lastModified"] = 0ULL; - //fprintf(stderr, "Error converting last modified: %s\n", PQgetvalue(res, i, 4)); } try { - config["mtu"] = std::stoi(PQgetvalue(res, i, 5)); + config["mtu"] = row["mtu"].as(); } catch (std::exception &e) { config["mtu"] = 2800; } try { - config["multicastLimit"] = std::stoi(PQgetvalue(res, i, 6)); + config["multicastLimit"] = row["multicast_limit"].as(); } catch (std::exception &e) { config["multicastLimit"] = 64; } - config["name"] = PQgetvalue(res, i, 7); - config["private"] = (strcmp(PQgetvalue(res, i, 8),"t")==0); + config["name"] = row["name"].as(); + config["private"] = row["private"].as(); try { - config["remoteTraceLevel"] = std::stoi(PQgetvalue(res, i, 9)); + config["remoteTraceLevel"] = row["remote_trace_level"].as(); } catch (std::exception &e) { config["remoteTraceLevel"] = 0; } - config["remoteTraceTarget"] = PQgetvalue(res, i, 10); + config["remoteTraceTarget"] = (row["remote_trace_target"].is_null() ? nullptr : row["remote_trace_target"].as()); try { - config["revision"] = std::stoull(PQgetvalue(res, i, 11)); + config["revision"] = row["revision"].as(); } catch (std::exception &e) { config["revision"] = 0ULL; //fprintf(stderr, "Error converting revision: %s\n", PQgetvalue(res, i, 11)); } - config["rules"] = json::parse(PQgetvalue(res, i, 12)); - config["tags"] = json::parse(PQgetvalue(res, i, 13)); - config["v4AssignMode"] = json::parse(PQgetvalue(res, i, 14)); - config["v6AssignMode"] = json::parse(PQgetvalue(res, i, 15)); + config["rules"] = json::parse(row["rules"].as()); + config["tags"] = json::parse(row["tags"].as()); + config["v4AssignMode"] = json::parse(row["v4_assign_mode"].as()); + config["v6AssignMode"] = json::parse(row["v6_assign_mode"].as()); config["objtype"] = "network"; config["ipAssignmentPools"] = json::array(); config["routes"] = json::array(); - PGresult *r2 = PQexecParams(conn, - "SELECT host(ip_range_start), host(ip_range_end) FROM ztc_network_assignment_pool WHERE network_id = $1", - 1, - NULL, - nwidparam, - NULL, - NULL, - 0); - - if (PQresultStatus(r2) != PGRES_TUPLES_OK) { - fprintf(stderr, "ERROR: Error retreiving IP pools for network: %s\n", PQresultErrorMessage(r2)); - PQclear(r2); - PQclear(res); - exit(1); - } - - int n = PQntuples(r2); - for (int j = 0; j < n; ++j) { + pqxx::work w2(conn); + pqxx::result res2 = w2.exec("SELECT host(ip_range_start) AS ip_range_start, host(ip_range_end) AS ip_range_end FROM ztc_network_assignment_pool WHERE network_id = '" + w2.esc(nwid) + "'"); + for(pqxx::result::const_iterator it = res2.begin(); it != res2.end(); ++it) { json ip; - ip["ipRangeStart"] = PQgetvalue(r2, j, 0); - ip["ipRangeEnd"] = PQgetvalue(r2, j, 1); - + ip["ipRangeStart"] = it["ip_range_start"].as(); + ip["ipRangeEnd"] = it["ip_range_end"].as(); config["ipAssignmentPools"].push_back(ip); } + w2.commit(); - PQclear(r2); - - r2 = PQexecParams(conn, - "SELECT host(address), bits, host(via) FROM ztc_network_route WHERE network_id = $1", - 1, - NULL, - nwidparam, - NULL, - NULL, - 0); - - if (PQresultStatus(r2) != PGRES_TUPLES_OK) { - fprintf(stderr, "ERROR: Error retreiving routes for network: %s\n", PQresultErrorMessage(r2)); - PQclear(r2); - PQclear(res); - exit(1); - } - - n = PQntuples(r2); - for (int j = 0; j < n; ++j) { - std::string addr = PQgetvalue(r2, j, 0); - std::string bits = PQgetvalue(r2, j, 1); - std::string via = PQgetvalue(r2, j, 2); + pqxx::work w3(conn); + pqxx::result res3 = w3.exec("SELECT host(address) AS address, bits, host(via) AS via FROM ztc_network_route WHERE network_id = '" + w3.esc(nwid) + "'"); + for(pqxx::result::const_iterator it = res3.begin(); it != res3.end(); ++it) { json route; - route["target"] = addr + "/" + bits; - - if (via == "NULL") { + route["target"] = it["address"].as() + "/" + it["bits"].as(); + if (route["via"].is_null()) { route["via"] = nullptr; } else { - route["via"] = via; + route["via"] = it["via"].as(); } config["routes"].push_back(route); } + w3.commit(); - PQclear(r2); - _networkChanged(empty, config, false); } - PQclear(res); + w.commit(); if (++this->_ready == 2) { if (_waitNoticePrinted) { @@ -337,147 +343,115 @@ void PostgreSQL::initializeNetworks(PGconn *conn) } } -void PostgreSQL::initializeMembers(PGconn *conn) +void PostgreSQL::initializeMembers(pqxx::connection &conn) { try { - if (PQstatus(conn) != CONNECTION_OK) { - fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn)); + if (!conn.is_open()) { + fprintf(stderr, "Bad Database Connection in initializeMembers\n"); exit(1); } - const char *params[1] = { - _myAddressStr.c_str() - }; - - PGresult *res = PQexecParams(conn, - "SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000, m.identity, " - " EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000, " - " EXTRACT(EPOCH FROM m.last_deauthorized_time AT TIME ZONE 'UTC')*1000, " - " m.remote_trace_level, m.remote_trace_target, m.tags, m.v_major, m.v_minor, m.v_rev, m.v_proto, " - " m.no_auto_assign_ips, m.revision " + pqxx::work w(conn); + pqxx::result res = w.exec( + "SELECT m.id AS id, m.network_id AS network_id, m.active_bridge AS active_bridge, " + " m.authorized AS authorized, m.capabilities AS capabilities, " + " EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000 AS creation_time, m.identity AS identity, " + " EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000 AS last_authorized_time, " + " EXTRACT(EPOCH FROM m.last_deauthorized_time AT TIME ZONE 'UTC')*1000 AS last_deauthorized_time, " + " m.remote_trace_level AS remote_trace_level, m.remote_trace_target AS remote_trace_target, " + " m.tags AS tags, m.v_major AS v_major, m.v_minor AS v_minor, m.v_rev AS v_rev, " + " m.v_proto AS v_proto, m.no_auto_assign_ips AS no_auto_assign_ips, m.revision AS revision" "FROM ztc_member m " "INNER JOIN ztc_network n " " ON n.id = m.network_id " - "WHERE n.controller_id = $1 AND m.deleted = false", - 1, - NULL, - params, - NULL, - NULL, - 0); + "WHERE n.controller_id = '" + w.esc(_myAddressStr) + "' AND m.deleted = false" + ); - if (PQresultStatus(res) != PGRES_TUPLES_OK) { - fprintf(stderr, "Member Initialization Failed: %s", PQerrorMessage(conn)); - PQclear(res); - exit(1); - } - - int numRows = PQntuples(res); - for (int i = 0; i < numRows; ++i) { + for(pqxx::result::const_iterator row = res.begin(); row != res.end(); ++row) { json empty; json config; - std::string memberId(PQgetvalue(res, i, 0)); - std::string networkId(PQgetvalue(res, i, 1)); - std::string ctime = PQgetvalue(res, i, 5); + std::string networkId = row["network_id"].as(); + std::string memberId = row["id"].as(); + config["id"] = memberId; config["nwid"] = networkId; - config["activeBridge"] = (strcmp(PQgetvalue(res, i, 2), "t") == 0); - config["authorized"] = (strcmp(PQgetvalue(res, i, 3), "t") == 0); + config["activeBridge"] = row["active_bridge"].as(); + config["authorized"] = row["authorized"].as(); try { - config["capabilities"] = json::parse(PQgetvalue(res, i, 4)); - } catch (std::exception &e) { + config["capabilities"] = json::parse(row["capabilities"].as()); + } catch(std::exception &e) { config["capabilities"] = json::array(); } try { - config["creationTime"] = std::stoull(PQgetvalue(res, i, 5)); - } catch (std::exception &e) { + config["creationTime"] = row["creation_time"].as(); + } catch(std::exception &e) { config["creationTime"] = 0ULL; - //fprintf(stderr, "Error upding creation time (member): %s\n", PQgetvalue(res, i, 5)); } - config["identity"] = PQgetvalue(res, i, 6); + config["identity"] = row["identity"].as(); try { - config["lastAuthorizedTime"] = std::stoull(PQgetvalue(res, i, 7)); + config["lastAuthorizedTime"] = row["last_authorized_time"].as(); } catch(std::exception &e) { config["lastAuthorizedTime"] = 0ULL; - //fprintf(stderr, "Error updating last auth time (member): %s\n", PQgetvalue(res, i, 7)); } try { - config["lastDeauthorizedTime"] = std::stoull(PQgetvalue(res, i, 8)); - } catch( std::exception &e) { + config["lastDeauthorizedTime"] = row["last_deauthorized_time"].as(); + } catch(std::exception &e) { config["lastDeauthorizedTime"] = 0ULL; - //fprintf(stderr, "Error updating last deauth time (member): %s\n", PQgetvalue(res, i, 8)); } try { - config["remoteTraceLevel"] = std::stoi(PQgetvalue(res, i, 9)); - } catch (std::exception &e) { + config["remoteTraceLevel"] = row["remote_trace_level"].as(); + } catch(std::exception &e) { config["remoteTraceLevel"] = 0; } - config["remoteTraceTarget"] = PQgetvalue(res, i, 10); + config["remoteTraceTarget"] = (row["remote_trace_target"].is_null() ? nullptr : row["remote_trace_target"].as()); try { - config["tags"] = json::parse(PQgetvalue(res, i, 11)); - } catch (std::exception &e) { + config["tags"] = json::parse(row["tags"].as()); + } catch(std::exception &e) { config["tags"] = json::array(); } try { - config["vMajor"] = std::stoi(PQgetvalue(res, i, 12)); + config["vMajor"] = row["v_major"].as(); } catch(std::exception &e) { config["vMajor"] = -1; } try { - config["vMinor"] = std::stoi(PQgetvalue(res, i, 13)); + config["vMinor"] = row["v_minor"].as(); } catch (std::exception &e) { config["vMinor"] = -1; } try { - config["vRev"] = std::stoi(PQgetvalue(res, i, 14)); + config["vRev"] = row["v_rev"].as(); } catch (std::exception &e) { config["vRev"] = -1; } try { - config["vProto"] = std::stoi(PQgetvalue(res, i, 15)); + config["vProto"] = row["v_proto"].as(); } catch (std::exception &e) { config["vProto"] = -1; } - config["noAutoAssignIps"] = (strcmp(PQgetvalue(res, i, 16), "t") == 0); + config["noAutoAssignIps"] = row["no_auto_assign_ips"].as(); try { - config["revision"] = std::stoull(PQgetvalue(res, i, 17)); + config["revision"] = row["revision"].as(); } catch (std::exception &e) { config["revision"] = 0ULL; //fprintf(stderr, "Error updating revision (member): %s\n", PQgetvalue(res, i, 17)); } config["objtype"] = "member"; config["ipAssignments"] = json::array(); - const char *p2[2] = { - memberId.c_str(), - networkId.c_str() - }; - PGresult *r2 = PQexecParams(conn, - "SELECT address FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2", - 2, - NULL, - p2, - NULL, - NULL, - 0); - - if (PQresultStatus(r2) != PGRES_TUPLES_OK) { - fprintf(stderr, "Member Initialization Failed: %s", PQerrorMessage(conn)); - PQclear(r2); - PQclear(res); - exit(1); - } - - int n = PQntuples(r2); - for (int j = 0; j < n; ++j) { - config["ipAssignments"].push_back(PQgetvalue(r2, j, 0)); + pqxx::work w2(conn); + pqxx::result r2 = w2.exec( + "SELECT address FROM ztc_member_ip_assignment WHERE member_id = '"+w2.esc(memberId)+"' AND network_id = '"+w2.esc(networkId)+"'" + ); + for(pqxx::result::const_iterator it = r2.begin(); it != r2.end(); ++it) { + config["ipAssignments"].push_back(it["address"].as()); } + w2.commit(); _memberChanged(empty, config, false); } - - PQclear(res); + w.commit(); if (++this->_ready == 2) { if (_waitNoticePrinted) { @@ -510,497 +484,237 @@ void PostgreSQL::heartbeat() const char *publicIdentity = publicId; const char *hostname = hostnameTmp; - PGconn *conn = PQconnectdb(_path.c_str()); - if (PQstatus(conn) == CONNECTION_BAD) { - fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); - PQfinish(conn); + fprintf(stderr, "Heartbeat connection opening"); + pqxx::connection conn(_connString); + if (!conn.is_open()) { + fprintf(stderr, "Connection to database failed: heartbeat\n"); exit(1); } + fprintf(stderr, "Heartbeat connection opened"); + conn.prepare("heartbeat", + "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build) " + "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8) " + "ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, " + "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, " + "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev" + ); + while (_run == 1) { - if(PQstatus(conn) != CONNECTION_OK) { - PQfinish(conn); - conn = PQconnectdb(_path.c_str()); + try { + pqxx::work w(conn); + pqxx::result res = w.prepared("heartbeat")(controllerId)(hostname) + (OSUtils::now())(publicIdentity)(ZEROTIER_ONE_VERSION_MAJOR) + (ZEROTIER_ONE_VERSION_MINOR)(ZEROTIER_ONE_VERSION_REVISION) + (ZEROTIER_ONE_VERSION_BUILD).exec(); + w.commit(); + } catch (std::exception &e) { + fprintf(stderr, "Error inserting heartbeat: %s\n", e.what()); + exit(1); } - if (conn) { - std::string major = std::to_string(ZEROTIER_ONE_VERSION_MAJOR); - std::string minor = std::to_string(ZEROTIER_ONE_VERSION_MINOR); - std::string rev = std::to_string(ZEROTIER_ONE_VERSION_REVISION); - std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD); - std::string now = std::to_string(OSUtils::now()); - const char *values[8] = { - controllerId, - hostname, - now.c_str(), - publicIdentity, - major.c_str(), - minor.c_str(), - rev.c_str(), - build.c_str() - }; - - PGresult *res = PQexecParams(conn, - "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build) " - "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8) " - "ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, " - "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, " - "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev", - 8, // number of parameters - NULL, // oid field. ignore - values, // values for substitution - NULL, // lengths in bytes of each value - NULL, // binary? - 0); - - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "Heartbeat Update Failed: %s\n", PQresultErrorMessage(res)); - } - PQclear(res); - } - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } - - PQfinish(conn); - conn = NULL; } void PostgreSQL::membersDbWatcher() { - PGconn *conn = PQconnectdb(_path.c_str()); - if (PQstatus(conn) == CONNECTION_BAD) { - fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); - PQfinish(conn); + try { + pqxx::connection conn(_connString); + if (!conn.is_open()) { + fprintf(stderr, "Connection to database failed: membersDbWatcher\n"); + exit(1); + } + + initializeMembers(conn); + + char buf[11] = {0}; + std::string cmd = "member_" + std::string(_myAddress.toString(buf)); + _MemberNotificationReceiver receiver(conn, cmd, this); + while(_run == 1) { + conn.await_notification(5, 0); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + conn.disconnect(); + } catch (std::exception &e) { + fprintf(stderr, "Exception in membersDbWatcher: %s\n", e.what()); exit(1); } - - initializeMembers(conn); - - char buf[11] = {0}; - std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf)); - PGresult *res = PQexec(conn, cmd.c_str()); - if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQfinish(conn); - exit(1); - } - - PQclear(res); res = NULL; - - while(_run == 1) { - if (PQstatus(conn) != CONNECTION_OK) { - fprintf(stderr, "ERROR: Member Watcher lost connection to Postgres."); - exit(-1); - } - PGnotify *notify = NULL; - PQconsumeInput(conn); - while ((notify = PQnotifies(conn)) != NULL) { - //fprintf(stderr, "ASYNC NOTIFY of '%s' id:%s received\n", notify->relname, notify->extra); - - try { - json tmp(json::parse(notify->extra)); - json &ov = tmp["old_val"]; - json &nv = tmp["new_val"]; - json oldConfig, newConfig; - if (ov.is_object()) oldConfig = ov; - if (nv.is_object()) newConfig = nv; - if (oldConfig.is_object() || newConfig.is_object()) { - _memberChanged(oldConfig,newConfig,(this->_ready>=2)); - } - } catch (...) {} // ignore bad records - - free(notify); - } - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } - PQfinish(conn); - conn = NULL; } void PostgreSQL::networksDbWatcher() { - PGconn *conn = PQconnectdb(_path.c_str()); - if (PQstatus(conn) == CONNECTION_BAD) { - fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); - PQfinish(conn); + try { + pqxx::connection conn(_connString); + if (!conn.is_open()) { + fprintf(stderr, "Connection to database failed: networksDbWatcher\n"); + exit(1); + } + + initializeNetworks(conn); + + char buf[11] = {0}; + std::string cmd = "network_" + std::string(_myAddress.toString(buf)); + _NetworkNotificationReceiver receiver(conn, cmd, this); + while(_run == 1) { + conn.await_notification(5, 0); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } catch(std::exception &e) { + fprintf(stderr, "Exception in networksDbWatcher: %s\n", e.what()); exit(1); } - - initializeNetworks(conn); - - char buf[11] = {0}; - std::string cmd = "LISTEN network_" + std::string(_myAddress.toString(buf)); - PGresult *res = PQexec(conn, cmd.c_str()); - if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQfinish(conn); - exit(1); - } - - PQclear(res); res = NULL; - - while(_run == 1) { - if (PQstatus(conn) != CONNECTION_OK) { - fprintf(stderr, "ERROR: Network Watcher lost connection to Postgres."); - exit(-1); - } - PGnotify *notify = NULL; - PQconsumeInput(conn); - while ((notify = PQnotifies(conn)) != NULL) { - //fprintf(stderr, "ASYNC NOTIFY of '%s' id:%s received\n", notify->relname, notify->extra); - try { - json tmp(json::parse(notify->extra)); - json &ov = tmp["old_val"]; - json &nv = tmp["new_val"]; - json oldConfig, newConfig; - if (ov.is_object()) oldConfig = ov; - if (nv.is_object()) newConfig = nv; - if (oldConfig.is_object()||newConfig.is_object()) { - _networkChanged(oldConfig,newConfig,(this->_ready >= 2)); - } - } catch (...) {} // ignore bad records - free(notify); - } - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } - PQfinish(conn); - conn = NULL; } void PostgreSQL::commitThread() { - PGconn *conn = PQconnectdb(_path.c_str()); - if (PQstatus(conn) == CONNECTION_BAD) { - fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn)); - PQfinish(conn); + pqxx::connection conn(_connString); + if (!conn.is_open()) { + fprintf(stderr, "ERROR: Connection to database failed: commitThread\n"); exit(1); } + conn.prepare("insert_member", + "INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, " + "identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, " + "remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) " + "VALUES ($1, $2, $3, $4, $5, $6, " + "TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), " + "$9, $10, (CASE WHEN $11='' THEN NULL ELSE $1 END), $12, $13, $14, $15, $16, $17) ON CONFLICT (network_id, id) DO UPDATE SET " + "active_bridge = EXCLUDED.active_bridge, authorized = EXCLUDED.authorized, capabilities = EXCLUDED.capabilities, " + "identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, " + "last_deauthorized_time = EXCLUDED.last_deauthorized_time, no_auto_assign_ips = EXCLUDED.no_auto_assign_ips, " + "remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, " + "revision = EXCLUDED.revision+1, tags = EXCLUDED.tags, v_major = EXCLUDED.v_major, " + "v_minor = EXCLUDED.v_minor, v_rev = EXCLUDED.v_rev, v_proto = EXCLUDED.v_proto"); + conn.prepare("delete_ip_assignments", + "DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2"); + conn.prepare("insert_ip_assignments", + "INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3)"); + + conn.prepare("update_network", + "UPDATE ztc_network SET controller_id = $2, capabilities = $3, enable_broadcast = $4, " + "last_updated = $5, mtu = $6, multicast_limit = $7, name = $8, private = $9, " + "remote_trace_level = $10, remote_trace_target = $11, rules = $12, rules_source = $13, " + "tags = $14, v4_assign_mode = $15, v6_assign_mode = $16 " + "WHERE id = $1"); + conn.prepare("delete_network_ip_pool", + "DELETE FROM ztc_network_assignment_pool WHERE network_id = $1"); + conn.prepare("insert_network_ip_pool", + "INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) VALUES ($1, $2, $3)"); + conn.prepare("delete_network_route", "DELETE FROM ztc_network_route WHERE network_id = $1"); + conn.prepare("insert_network_route", "INSERT INTO ztc_network_route (network_id, address, bits, via) VALUES ($1, $2, $3, $4)"); + conn.prepare("delete_network", "UPDATE ztc_network SET deleted = true WHERE id = $1"); + conn.prepare("delete_member", "UPDATE ztc_member SET hidden = true, deleted = true WHERE id = $1 AND network_id = $2"); + json *config = nullptr; while(_commitQueue.get(config)&(_run == 1)) { if (!config) { continue; } - if (PQstatus(conn) == CONNECTION_BAD) { - fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn)); - PQfinish(conn); - delete config; - exit(1); - } try { const std::string objtype = (*config)["objtype"]; if (objtype == "member") { + std::string memberId = (*config)["id"]; + std::string networkId = (*config)["nwid"]; + std::string target(""); + if (!(*config)["remoteTraceTarget"].is_null()) { + target = (*config)["remoteTraceTarget"]; + } + std::string identity = (*config)["identity"]; + try { - std::string memberId = (*config)["id"]; - std::string networkId = (*config)["nwid"]; - std::string identity = (*config)["identity"]; - std::string target = "NULL"; - - if (!(*config)["remoteTraceTarget"].is_null()) { - target = (*config)["remoteTraceTarget"]; - } - std::string caps = OSUtils::jsonDump((*config)["capabilities"], -1); - std::string lastAuthTime = std::to_string((long long)(*config)["lastAuthorizedTime"]); - std::string lastDeauthTime = std::to_string((long long)(*config)["lastDeauthorizedTime"]); - std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]); - std::string rev = std::to_string((unsigned long long)(*config)["revision"]); std::string tags = OSUtils::jsonDump((*config)["tags"], -1); - std::string vmajor = std::to_string((int)(*config)["vMajor"]); - std::string vminor = std::to_string((int)(*config)["vMinor"]); - std::string vrev = std::to_string((int)(*config)["vRev"]); - std::string vproto = std::to_string((int)(*config)["vProto"]); - const char *values[19] = { - memberId.c_str(), - networkId.c_str(), - ((*config)["activeBridge"] ? "true" : "false"), - ((*config)["authorized"] ? "true" : "false"), - caps.c_str(), - identity.c_str(), - lastAuthTime.c_str(), - lastDeauthTime.c_str(), - ((*config)["noAutoAssignIps"] ? "true" : "false"), - rtraceLevel.c_str(), - (target == "NULL") ? NULL : target.c_str(), - rev.c_str(), - tags.c_str(), - vmajor.c_str(), - vminor.c_str(), - vrev.c_str(), - vproto.c_str() - }; - PGresult *res = PQexecParams(conn, - "INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, " - "identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, " - "remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) " - "VALUES ($1, $2, $3, $4, $5, $6, " - "TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), " - "$9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (network_id, id) DO UPDATE SET " - "active_bridge = EXCLUDED.active_bridge, authorized = EXCLUDED.authorized, capabilities = EXCLUDED.capabilities, " - "identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, " - "last_deauthorized_time = EXCLUDED.last_deauthorized_time, no_auto_assign_ips = EXCLUDED.no_auto_assign_ips, " - "remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, " - "revision = EXCLUDED.revision+1, tags = EXCLUDED.tags, v_major = EXCLUDED.v_major, " - "v_minor = EXCLUDED.v_minor, v_rev = EXCLUDED.v_rev, v_proto = EXCLUDED.v_proto", - 17, - NULL, - values, - NULL, - NULL, - 0); + pqxx::work w(conn); + pqxx::result res = w.prepared("insert_member")(memberId)(networkId) + ((bool)(*config)["activeBridge"])((bool)(*config)["authorized"])(caps) + (identity)((long long)(*config)["lastAuthorizedTime"])((long long)(*config)["lastDeauthorizedTime"]) + ((bool)(*config)["noAutoAssignIps"])((int)(*config)["remoteTraceLevel"]) + (target, !target.empty()) + ((int)(*config)["revision"])(tags)((int)(*config)["vMajor"])((int)(*config)["vMinor"]) + ((int)(*config)["vRev"])((int)(*config)["vProto"]).exec(); + w.commit(); + } catch (std::exception &e) { + fprintf(stderr, "Exception upserting member: %s\n", e.what()); + delete config; + config = nullptr; + continue; + } + + try { + pqxx::work w(conn); + pqxx::result res = w.prepared("delete_ip_assignments")(memberId)(networkId).exec(); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error updating member: %s\n", PQresultErrorMessage(res)); - fprintf(stderr, "%s", OSUtils::jsonDump(*config, 2).c_str()); - PQclear(res); - delete config; - config = nullptr; - continue; - } - - PQclear(res); - - res = PQexec(conn, "BEGIN"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error beginning transaction: %s\n", PQresultErrorMessage(res)); - PQclear(res); - delete config; - config = nullptr; - continue; - } - - PQclear(res); - - const char *v2[2] = { - memberId.c_str(), - networkId.c_str() - }; - - res = PQexecParams(conn, - "DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2", - 2, - NULL, - v2, - NULL, - NULL, - 0); - - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error updating IP address assignments: %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQclear(PQexec(conn, "ROLLBACK"));; - delete config; - config = nullptr; - continue; - } - - PQclear(res); - for (auto i = (*config)["ipAssignments"].begin(); i != (*config)["ipAssignments"].end(); ++i) { std::string addr = *i; - const char *v3[3] = { - memberId.c_str(), - networkId.c_str(), - addr.c_str() - }; - - res = PQexecParams(conn, - "INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3)", - 3, - NULL, - v3, - NULL, - NULL, - 0); - - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error setting IP addresses for member: %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQclear(PQexec(conn, "ROLLBACK")); - continue; - } + pqxx::result res2 = w.prepared("insert_ip_assignments")(memberId)(networkId)(addr).exec(); } - - res = PQexec(conn, "COMMIT"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error committing ip address data: %s\n", PQresultErrorMessage(res)); - } - - PQclear(res); - - const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL); - const uint64_t memberidInt = OSUtils::jsonIntHex((*config)["id"], 0ULL); - if (nwidInt && memberidInt) { - nlohmann::json nwOrig; - nlohmann::json memOrig; - - nlohmann::json memNew(*config); - - get(nwidInt, nwOrig, memberidInt, memOrig); - - _memberChanged(memOrig, memNew, (this->_ready>=2)); - } else { - fprintf(stderr, "Can't notify of change. Error parsing nwid or memberid: %lu-%lu\n", nwidInt, memberidInt); - } - + w.commit(); } catch (std::exception &e) { - fprintf(stderr, "ERROR: Error updating member: %s\n", e.what()); + fprintf(stderr, "Error assigning member IP addresses: %s\n", e.what()); + delete config; + config = nullptr; + } + + const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL); + const uint64_t memberidInt = OSUtils::jsonIntHex((*config)["id"], 0ULL); + if (nwidInt && memberidInt) { + nlohmann::json nwOrig; + nlohmann::json memOrig; + + nlohmann::json memNew(*config); + + get(nwidInt, nwOrig, memberidInt, memOrig); + + _memberChanged(memOrig, memNew, (this->_ready>=2)); + } else { + fprintf(stderr, "Can't notify of change. Error parsing nwid or memberid: %lu-%lu\n", nwidInt, memberidInt); } } else if (objtype == "network") { + std::string id = (*config)["id"]; + std::string controllerId = _myAddressStr.c_str(); + std::string name = (*config)["name"]; + std::string rulesSource = (*config)["rulesSource"]; + std::string caps = OSUtils::jsonDump((*config)["capabilitles"], -1); + std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]); + std::string rules = OSUtils::jsonDump((*config)["rules"], -1); + std::string tags = OSUtils::jsonDump((*config)["tags"], -1); + std::string v4mode = OSUtils::jsonDump((*config)["v4AssignMode"],-1); + std::string v6mode = OSUtils::jsonDump((*config)["v6AssignMode"], -1); + std::string target = ""; + if (!(*config)["remoteTraceTarget"].is_null()) { + target = (*config)["remoteTraceTarget"]; + } try { - std::string id = (*config)["id"]; - std::string controllerId = _myAddressStr.c_str(); - std::string name = (*config)["name"]; - std::string remoteTraceTarget("NULL"); - if (!(*config)["remoteTraceTarget"].is_null()) { - remoteTraceTarget = (*config)["remoteTraceTarget"]; - } - std::string rulesSource = (*config)["rulesSource"]; - std::string caps = OSUtils::jsonDump((*config)["capabilitles"], -1); - std::string now = std::to_string(OSUtils::now()); - std::string mtu = std::to_string((int)(*config)["mtu"]); - std::string mcastLimit = std::to_string((int)(*config)["multicastLimit"]); - std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]); - std::string rules = OSUtils::jsonDump((*config)["rules"], -1); - std::string tags = OSUtils::jsonDump((*config)["tags"], -1); - std::string v4mode = OSUtils::jsonDump((*config)["v4AssignMode"],-1); - std::string v6mode = OSUtils::jsonDump((*config)["v6AssignMode"], -1); - bool enableBroadcast = (*config)["enableBroadcast"]; - bool isPrivate = (*config)["private"]; + pqxx::work w(conn); + pqxx::result res = w.prepared("update_network")(id)(controllerId)(caps)((bool)(*config)["enableBroadcast"]) + (OSUtils::now())((int)(*config)["mtu"])((int)(*config)["multicastLimit"])(name)((bool)(*config)["private"]) + ((int)(*config)["remoteTraceLevel"])(target, !target.empty()) + (rules)(rulesSource)(tags)(v4mode)(v6mode).exec(); + w.commit(); + } catch (std::exception &e) { + fprintf(stderr, "Error updating network config: %s\n", e.what()); + delete config; + config = nullptr; + continue; + } - const char *values[16] = { - id.c_str(), - controllerId.c_str(), - caps.c_str(), - enableBroadcast ? "true" : "false", - now.c_str(), - mtu.c_str(), - mcastLimit.c_str(), - name.c_str(), - isPrivate ? "true" : "false", - rtraceLevel.c_str(), - (remoteTraceTarget == "NULL" ? NULL : remoteTraceTarget.c_str()), - rules.c_str(), - rulesSource.c_str(), - tags.c_str(), - v4mode.c_str(), - v6mode.c_str(), - }; - - PGresult *res = PQexecParams(conn, - "UPDATE ztc_network SET controller_id = $2, capabilities = $3, enable_broadcast = $4, " - "last_updated = $5, mtu = $6, multicast_limit = $7, name = $8, private = $9, " - "remote_trace_level = $10, remote_trace_target = $11, rules = $12, rules_source = $13, " - "tags = $14, v4_assign_mode = $15, v6_assign_mode = $16 " - "WHERE id = $1", - 16, - NULL, - values, - NULL, - NULL, - 0); - - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error updating network record: %s\n", PQresultErrorMessage(res)); - PQclear(res); - delete config; - config = nullptr; - continue; - } - - PQclear(res); - - res = PQexec(conn, "BEGIN"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error beginnning transaction: %s\n", PQresultErrorMessage(res)); - PQclear(res); - delete config; - config = nullptr; - continue; - } - - PQclear(res); - - const char *params[1] = { - id.c_str() - }; - res = PQexecParams(conn, - "DELETE FROM ztc_network_assignment_pool WHERE network_id = $1", - 1, - NULL, - params, - NULL, - NULL, - 0); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error updating assignment pool: %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQclear(PQexec(conn, "ROLLBACK")); - delete config; - config = nullptr; - continue; - } - - PQclear(res); + try { + pqxx::work w(conn); + pqxx::result res = w.prepared("delete_network_ip_pool")(id).exec(); auto pool = (*config)["ipAssignmentPools"]; - bool err = false; for (auto i = pool.begin(); i != pool.end(); ++i) { std::string start = (*i)["ipRangeStart"]; std::string end = (*i)["ipRangeEnd"]; - const char *p[3] = { - id.c_str(), - start.c_str(), - end.c_str() - }; - res = PQexecParams(conn, - "INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) " - "VALUES ($1, $2, $3)", - 3, - NULL, - p, - NULL, - NULL, - 0); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error updating assignment pool: %s\n", PQresultErrorMessage(res)); - PQclear(res); - err = true; - break; - } - PQclear(res); - } - if (err) { - PQclear(PQexec(conn, "ROLLBACK")); - delete config; - config = nullptr; - continue; + pqxx::result r2 = w.prepared("insert_nework_ip_pool")(id)(start)(end).exec(); } - res = PQexecParams(conn, - "DELETE FROM ztc_network_route WHERE network_id = $1", - 1, - NULL, - params, - NULL, - NULL, - 0); - - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error updating routes: %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQclear(PQexec(conn, "ROLLBACK")); - delete config; - config = nullptr; - continue; - } + pqxx::result res2 = w.prepared("delete_network_route")(id).exec(); auto routes = (*config)["routes"]; - err = false; for (auto i = routes.begin(); i != routes.end(); ++i) { std::string t = (*i)["target"]; std::vector target; @@ -1013,86 +727,42 @@ void PostgreSQL::commitThread() continue; } std::string targetAddr = target[0]; - std::string targetBits = target[1]; - std::string via = "NULL"; + int targetBits = std::stoi(target[1]); + std::string via = ""; if (!(*i)["via"].is_null()) { via = (*i)["via"]; } - - const char *p[4] = { - id.c_str(), - targetAddr.c_str(), - targetBits.c_str(), - (via == "NULL" ? NULL : via.c_str()), - }; - - res = PQexecParams(conn, - "INSERT INTO ztc_network_route (network_id, address, bits, via) VALUES ($1, $2, $3, $4)", - 4, - NULL, - p, - NULL, - NULL, - 0); - - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error updating routes: %s\n", PQresultErrorMessage(res)); - PQclear(res); - err = true; - break; - } - PQclear(res); - } - if (err) { - PQclear(PQexec(conn, "ROLLBACK")); - delete config; - config = nullptr; - continue; + + pqxx::result res3 = w.prepared("insert_network_route")(id)(targetAddr)(targetBits) + (via, !via.empty()).exec(); } - res = PQexec(conn, "COMMIT"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error committing network update: %s\n", PQresultErrorMessage(res)); - } - PQclear(res); - - const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL); - if (nwidInt) { - nlohmann::json nwOrig; - nlohmann::json nwNew(*config); - - get(nwidInt, nwOrig); - - _networkChanged(nwOrig, nwNew, true); - } else { - fprintf(stderr, "Can't notify network changed: %lu\n", nwidInt); - } + w.commit(); } catch (std::exception &e) { - fprintf(stderr, "ERROR: Error updating member: %s\n", e.what()); + fprintf(stderr, "Error updating network IP pool: %s\n", e.what()); + } + + const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL); + if (nwidInt) { + nlohmann::json nwOrig; + nlohmann::json nwNew(*config); + + get(nwidInt, nwOrig); + + _networkChanged(nwOrig, nwNew, true); + } else { + fprintf(stderr, "Can't notify network changed: %lu\n", nwidInt); } } else if (objtype == "trace") { fprintf(stderr, "ERROR: Trace not yet implemented"); } else if (objtype == "_delete_network") { try { std::string networkId = (*config)["nwid"]; - const char *values[1] = { - networkId.c_str() - }; - PGresult * res = PQexecParams(conn, - "UPDATE ztc_network SET deleted = true WHERE id = $1", - 1, - NULL, - values, - NULL, - NULL, - 0); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error deleting network: %s\n", PQresultErrorMessage(res)); - } - - PQclear(res); + pqxx::work w(conn); + pqxx::result res = w.prepared("delete_network")(networkId).exec(); + w.commit(); } catch (std::exception &e) { fprintf(stderr, "ERROR: Error deleting network: %s\n", e.what()); } @@ -1101,25 +771,9 @@ void PostgreSQL::commitThread() std::string memberId = (*config)["id"]; std::string networkId = (*config)["nwid"]; - const char *values[2] = { - memberId.c_str(), - networkId.c_str() - }; - - PGresult *res = PQexecParams(conn, - "UPDATE ztc_member SET hidden = true, deleted = true WHERE id = $1 AND network_id = $2", - 2, - NULL, - values, - NULL, - NULL, - 0); - - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error deleting member: %s\n", PQresultErrorMessage(res)); - } - - PQclear(res); + pqxx::work w(conn); + pqxx::result res = w.prepared("delete_member")(memberId)(networkId).exec(); + w.commit(); } catch (std::exception &e) { fprintf(stderr, "ERROR: Error deleting member: %s\n", e.what()); } @@ -1132,18 +786,15 @@ void PostgreSQL::commitThread() delete config; config = nullptr; - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); } - - PQfinish(conn); } void PostgreSQL::onlineNotificationThread() { - PGconn *conn = PQconnectdb(_path.c_str()); - if (PQstatus(conn) == CONNECTION_BAD) { - fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); - PQfinish(conn); + pqxx::connection conn(_connString); + if(!conn.is_open()) { + fprintf(stderr, "Connection to database failed: onlineNotificationThread\n"); exit(1); } _connected = 1; @@ -1152,11 +803,6 @@ void PostgreSQL::onlineNotificationThread() std::unordered_map< std::pair,int64_t,_PairHasher > lastOnlineCumulative; while (_run == 1) { - if (PQstatus(conn) != CONNECTION_OK) { - fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres."); - exit(-1); - } - // map used to send notifications to front end std::unordered_map> updateMap; @@ -1166,230 +812,144 @@ void PostgreSQL::onlineNotificationThread() lastOnline.swap(_lastOnline); } - PGresult *res = NULL; - int qCount = 0; + try { + pqxx::work w(conn); + pqxx::pipeline p(w, "Member Update Pipeline"); + for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) { + uint64_t nwid_i = i->first.first; + char nwidTmp[64]; + char memTmp[64]; + char ipTmp[64]; + OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i); + OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", i->first.second); - res = PQexec(conn, "BEGIN"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error on BEGIN command (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); - PQclear(res); - exit(1); - } - PQclear(res); - - for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) { - uint64_t nwid_i = i->first.first; - char nwidTmp[64]; - char memTmp[64]; - char ipTmp[64]; - OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i); - OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", i->first.second); - - auto found = _networks.find(nwid_i); - if (found == _networks.end()) { - continue; // skip members trying to join non-existant networks - } - - lastOnlineCumulative[i->first] = i->second.first; - - - std::string networkId(nwidTmp); - std::string memberId(memTmp); - - std::vector &members = updateMap[networkId]; - members.push_back(memberId); - - int64_t ts = i->second.first; - std::string ipAddr = i->second.second.toIpString(ipTmp); - std::string timestamp = std::to_string(ts); - - const char *values[4] = { - networkId.c_str(), - memberId.c_str(), - (ipAddr.empty() ? NULL : ipAddr.c_str()), - timestamp.c_str(), - }; - - res = PQexecParams(conn, - "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES ($1, $2, $3, TO_TIMESTAMP($4::double precision/1000)) " - "ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated", - 4, // number of parameters - NULL, // oid field. ignore - values, // values for substitution - NULL, // lengths in bytes of each value - NULL, - 0); - - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "Error on Member Status upsert: %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQclear(PQexec(conn, "ROLLBACK")); - continue; - } - - PQclear(res); - - if ((++qCount) == 1024) { - res = PQexec(conn, "COMMIT"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error on commit (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQclear(PQexec(conn, "ROLLBACK")); - exit(1); + auto found = _networks.find(nwid_i); + if (found == _networks.end()) { + continue; // skip members trying to join non-existant networks } - PQclear(res); - res = PQexec(conn, "BEGIN"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error on BEGIN (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); - PQclear(res); - exit(1); - } - PQclear(res); - qCount = 0; + lastOnlineCumulative[i->first] = i->second.first; + + + std::string networkId(nwidTmp); + std::string memberId(memTmp); + + std::vector &members = updateMap[networkId]; + members.push_back(memberId); + + int64_t ts = i->second.first; + std::string ipAddr = i->second.second.toIpString(ipTmp); + std::string timestamp = std::to_string(ts); + + std::stringstream ss; + ss << "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES (" + << "'" << w.esc(networkId) << "', " + << "'" << w.esc(memberId) << "', " + << "'" << w.esc(ipAddr) << "', " + << timestamp << ") " + << "ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated"; + p.insert(ss.str()); } + p.complete(); + w.commit(); + } catch (std::exception &e) { + fprintf(stderr, "Error updating member status: %s\n", e.what()); } - res = PQexec(conn, "COMMIT"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error on commit (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQclear(PQexec(conn, "ROLLBACK")); - exit(1); - } - PQclear(res); - const int64_t now = OSUtils::now(); - if ((now - lastUpdatedNetworkStatus) > 10000) { - lastUpdatedNetworkStatus = now; + try { + const int64_t now = OSUtils::now(); + if ((now - lastUpdatedNetworkStatus) > 10000) { + pqxx::work w(conn); + pqxx::pipeline p(w, "Network Update Pipeline"); + lastUpdatedNetworkStatus = now; - std::vector>> networks; - { - std::lock_guard l(_networks_l); - for (auto i = _networks.begin(); i != _networks.end(); ++i) { - networks.push_back(*i); - } - } - - int nCount = 0; - - res = PQexec(conn, "BEGIN"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error on BEGIN command (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); - PQclear(res); - exit(1); - } - PQclear(res); - for (auto i = networks.begin(); i != networks.end(); ++i) { - char tmp[64]; - Utils::hex(i->first, tmp); - - std::string networkId(tmp); - - std::vector &_notUsed = updateMap[networkId]; - (void)_notUsed; - - uint64_t authMemberCount = 0; - uint64_t totalMemberCount = 0; - uint64_t onlineMemberCount = 0; - uint64_t bridgeCount = 0; - uint64_t ts = now; + std::vector>> networks; { - std::lock_guard l2(i->second->lock); - authMemberCount = i->second->authorizedMembers.size(); - totalMemberCount = i->second->members.size(); - bridgeCount = i->second->activeBridgeMembers.size(); - for (auto m=i->second->members.begin(); m != i->second->members.end(); ++m) { - auto lo = lastOnlineCumulative.find(std::pair(i->first, m->first)); - if (lo != lastOnlineCumulative.end()) { - if ((now - lo->second) <= (ZT_NETWORK_AUTOCONF_DELAY * 2)) { - ++onlineMemberCount; - } else { - lastOnlineCumulative.erase(lo); + std::lock_guard l(_networks_l); + for (auto i = _networks.begin(); i != _networks.end(); ++i) { + networks.push_back(*i); + } + } + + for (auto i = networks.begin(); i != networks.end(); ++i) { + char tmp[64]; + Utils::hex(i->first, tmp); + + std::string networkId(tmp); + + std::vector &_notUsed = updateMap[networkId]; + (void)_notUsed; + + uint64_t authMemberCount = 0; + uint64_t totalMemberCount = 0; + uint64_t onlineMemberCount = 0; + uint64_t bridgeCount = 0; + uint64_t ts = now; + { + std::lock_guard l2(i->second->lock); + authMemberCount = i->second->authorizedMembers.size(); + totalMemberCount = i->second->members.size(); + bridgeCount = i->second->activeBridgeMembers.size(); + for (auto m=i->second->members.begin(); m != i->second->members.end(); ++m) { + auto lo = lastOnlineCumulative.find(std::pair(i->first, m->first)); + if (lo != lastOnlineCumulative.end()) { + if ((now - lo->second) <= (ZT_NETWORK_AUTOCONF_DELAY * 2)) { + ++onlineMemberCount; + } else { + lastOnlineCumulative.erase(lo); + } } } } + + std::string bc = std::to_string(bridgeCount); + std::string amc = std::to_string(authMemberCount); + std::string omc = std::to_string(onlineMemberCount); + std::string tmc = std::to_string(totalMemberCount); + std::string timestamp = std::to_string(ts); + + std::stringstream ss; + ss << "INSERT INTO ztc_network_status (network_id, bridge_count, authorized_member_count, " + << "online_member_count, total_member_count, last_modified) VALUES (" + << "'" << w.esc(networkId) << "', " + << bridgeCount << ", " + << authMemberCount << ", " + << onlineMemberCount << ", " + << totalMemberCount << ", " + << "TO_TIMESTAMP(" << ts << "::double precision/1000)) " + << "ON CONFLICT (network_id) DO UPDATE SET bridge_count = EXCLUDED.bridge_count, " + << "authorized_member_count = EXCLUDED.authorized_member_count, online_member_count = EXCLUDED.online_member_count, " + << "total_member_count = EXCLUDED.total_member_count, last_modified = EXCLUDED.last_modified"; + p.insert(ss.str()); } - - std::string bc = std::to_string(bridgeCount); - std::string amc = std::to_string(authMemberCount); - std::string omc = std::to_string(onlineMemberCount); - std::string tmc = std::to_string(totalMemberCount); - std::string timestamp = std::to_string(ts); - const char *values[6] = { - networkId.c_str(), - bc.c_str(), - amc.c_str(), - omc.c_str(), - tmc.c_str(), - timestamp.c_str() - }; - - res = PQexecParams(conn, "INSERT INTO ztc_network_status (network_id, bridge_count, authorized_member_count, " - "online_member_count, total_member_count, last_modified) VALUES ($1, $2, $3, $4, $5, TO_TIMESTAMP($6::double precision/1000)) " - "ON CONFLICT (network_id) DO UPDATE SET bridge_count = EXCLUDED.bridge_count, " - "authorized_member_count = EXCLUDED.authorized_member_count, online_member_count = EXCLUDED.online_member_count, " - "total_member_count = EXCLUDED.total_member_count, last_modified = EXCLUDED.last_modified", - 6, - NULL, - values, - NULL, - NULL, - 0); - - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error on Network Status upsert (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQclear(PQexec(conn, "ROLLBACK")); - exit(1); - } - - if ((++nCount) == 1024) { - res = PQexec(conn, "COMMIT"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error on COMMIT (onlineNotificationThread): %s\n" , PQresultErrorMessage(res)); - PQclear(res); - PQclear(PQexec(conn, "ROLLBACK")); - exit(1); - } - - res = PQexec(conn, "BEGIN"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error on BEGIN command (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); - PQclear(res); - exit(1); - } - - nCount = 0; - } - } - res = PQexec(conn, "COMMIT"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error on COMMIT (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQclear(PQexec(conn, "ROLLBACK")); - exit(1); + p.complete(); + w.commit(); } + } catch (std::exception &e) { + fprintf(stderr, "Error updating network status: %s\n", e.what()); } - for (auto it = updateMap.begin(); it != updateMap.end(); ++it) { - std::string networkId = it->first; - std::vector members = it->second; - std::stringstream queryBuilder; + try { + pqxx::work w(conn); + pqxx::pipeline p(w, "Notification Sender"); + for (auto it = updateMap.begin(); it != updateMap.end(); ++it) { + std::string networkId = it->first; + std::vector members = it->second; + std::stringstream queryBuilder; - std::string membersStr = ::join(members, ","); + std::string membersStr = ::join(members, ","); - queryBuilder << "NOTIFY controller, '" << networkId << ":" << membersStr << "'"; - std::string query = queryBuilder.str(); + queryBuilder << "NOTIFY controller, '" << networkId << ":" << membersStr << "'"; + std::string query = queryBuilder.str(); - PGresult *res = PQexec(conn,query.c_str()); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error sending NOTIFY: %s\n", PQresultErrorMessage(res)); + p.insert(query); } - PQclear(res); + p.complete(); + w.commit(); + } catch (std::exception &e) { + fprintf(stderr, "Error notifying webapp: %s\n", e.what()); } - - std::this_thread::sleep_for(std::chrono::milliseconds(250)); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); } - PQfinish(conn); } #endif //ZT_CONTROLLER_USE_LIBPQ diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp index 36fe8c9f2..86aac80d3 100644 --- a/controller/PostgreSQL.hpp +++ b/controller/PostgreSQL.hpp @@ -23,12 +23,16 @@ #include "DB.hpp" +#include + extern "C" { typedef struct pg_conn PGconn; } namespace ZeroTier { +class _MemberNotificationReceiver; +class _NetworkNotificationReceiver; /** * A controller database driver that talks to PostgreSQL @@ -56,8 +60,8 @@ protected: }; private: - void initializeNetworks(PGconn *conn); - void initializeMembers(PGconn *conn); + void initializeNetworks(pqxx::connection &conn); + void initializeMembers(pqxx::connection &conn); void heartbeat(); void membersDbWatcher(); void networksDbWatcher(); @@ -81,6 +85,9 @@ private: mutable std::mutex _readyLock; std::atomic _ready, _connected, _run; mutable volatile bool _waitNoticePrinted; + + friend class _MemberNotificationReceiver; + friend class _NetworkNotificationReceiver; }; } diff --git a/docker/Dockerfile b/docker/Dockerfile index ec9031915..f13a2bc6d 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -4,7 +4,7 @@ MAINTAINER Adam Ierymekno , Grant Limberg