remove redundant writes when changes come from Central

network & member changes tagged with `"fromCentral": true` will not be rewritten to the db
This commit is contained in:
Grant Limberg 2020-10-05 11:02:40 -07:00
parent 610d4ff016
commit f9396f979f
No known key found for this signature in database
GPG Key ID: 2BA62CCABBB4095A

View File

@ -1078,160 +1078,165 @@ void PostgreSQL::commitThread()
const std::string objtype = (*config)["objtype"];
if (objtype == "member") {
try {
std::string memberId = (*config)["id"];
std::string networkId = (*config)["nwid"];
std::string identity = (*config)["identity"];
std::string target = "NULL";
bool fromCentral = OSUtils::jsonBool((*config)["fromCentral"], false);
if (!fromCentral) {
// Central already writes all of this to the DB on a change.
// No need for the controller to do it as well.
std::string memberId = (*config)["id"];
std::string networkId = (*config)["nwid"];
std::string identity = (*config)["identity"];
std::string target = "NULL";
if (!(*config)["remoteTraceTarget"].is_null()) {
target = (*config)["remoteTraceTarget"];
}
std::string caps = OSUtils::jsonDump((*config)["capabilities"], -1);
std::string lastAuthTime = std::to_string((long long)(*config)["lastAuthorizedTime"]);
std::string lastDeauthTime = std::to_string((long long)(*config)["lastDeauthorizedTime"]);
std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]);
std::string rev = std::to_string((unsigned long long)(*config)["revision"]);
std::string tags = OSUtils::jsonDump((*config)["tags"], -1);
std::string vmajor = std::to_string((int)(*config)["vMajor"]);
std::string vminor = std::to_string((int)(*config)["vMinor"]);
std::string vrev = std::to_string((int)(*config)["vRev"]);
std::string vproto = std::to_string((int)(*config)["vProto"]);
const char *values[19] = {
memberId.c_str(),
networkId.c_str(),
((*config)["activeBridge"] ? "true" : "false"),
((*config)["authorized"] ? "true" : "false"),
caps.c_str(),
identity.c_str(),
lastAuthTime.c_str(),
lastDeauthTime.c_str(),
((*config)["noAutoAssignIps"] ? "true" : "false"),
rtraceLevel.c_str(),
(target == "NULL") ? NULL : target.c_str(),
rev.c_str(),
tags.c_str(),
vmajor.c_str(),
vminor.c_str(),
vrev.c_str(),
vproto.c_str()
};
PGresult *res = PQexec(conn, "BEGIN");
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "ERROR: Error beginning update transaction: %s\n", PQresultErrorMessage(res));
PQclear(res);
delete config;
config = nullptr;
continue;
}
res = PQexecParams(conn,
"INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, "
"identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, "
"remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) "
"VALUES ($1, $2, $3, $4, $5, $6, "
"TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), "
"$9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (network_id, id) DO UPDATE SET "
"active_bridge = EXCLUDED.active_bridge, authorized = EXCLUDED.authorized, capabilities = EXCLUDED.capabilities, "
"identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, "
"last_deauthorized_time = EXCLUDED.last_deauthorized_time, no_auto_assign_ips = EXCLUDED.no_auto_assign_ips, "
"remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, "
"revision = EXCLUDED.revision+1, tags = EXCLUDED.tags, v_major = EXCLUDED.v_major, "
"v_minor = EXCLUDED.v_minor, v_rev = EXCLUDED.v_rev, v_proto = EXCLUDED.v_proto",
17,
NULL,
values,
NULL,
NULL,
0);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "ERROR: Error updating member: %s\n", PQresultErrorMessage(res));
fprintf(stderr, "%s", OSUtils::jsonDump(*config, 2).c_str());
PQclear(res);
PQclear(PQexec(conn, "ROLLBACK"));
delete config;
config = nullptr;
continue;
}
PQclear(res);
const char *v2[2] = {
memberId.c_str(),
networkId.c_str()
};
res = PQexecParams(conn,
"DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2",
2,
NULL,
v2,
NULL,
NULL,
0);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "ERROR: Error updating IP address assignments: %s\n", PQresultErrorMessage(res));
PQclear(res);
PQclear(PQexec(conn, "ROLLBACK"));;
delete config;
config = nullptr;
continue;
}
PQclear(res);
std::vector<std::string> assignments;
bool ipAssignError = false;
for (auto i = (*config)["ipAssignments"].begin(); i != (*config)["ipAssignments"].end(); ++i) {
std::string addr = *i;
if (std::find(assignments.begin(), assignments.end(), addr) != assignments.end()) {
continue;
if (!(*config)["remoteTraceTarget"].is_null()) {
target = (*config)["remoteTraceTarget"];
}
const char *v3[3] = {
std::string caps = OSUtils::jsonDump((*config)["capabilities"], -1);
std::string lastAuthTime = std::to_string((long long)(*config)["lastAuthorizedTime"]);
std::string lastDeauthTime = std::to_string((long long)(*config)["lastDeauthorizedTime"]);
std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]);
std::string rev = std::to_string((unsigned long long)(*config)["revision"]);
std::string tags = OSUtils::jsonDump((*config)["tags"], -1);
std::string vmajor = std::to_string((int)(*config)["vMajor"]);
std::string vminor = std::to_string((int)(*config)["vMinor"]);
std::string vrev = std::to_string((int)(*config)["vRev"]);
std::string vproto = std::to_string((int)(*config)["vProto"]);
const char *values[19] = {
memberId.c_str(),
networkId.c_str(),
addr.c_str()
((*config)["activeBridge"] ? "true" : "false"),
((*config)["authorized"] ? "true" : "false"),
caps.c_str(),
identity.c_str(),
lastAuthTime.c_str(),
lastDeauthTime.c_str(),
((*config)["noAutoAssignIps"] ? "true" : "false"),
rtraceLevel.c_str(),
(target == "NULL") ? NULL : target.c_str(),
rev.c_str(),
tags.c_str(),
vmajor.c_str(),
vminor.c_str(),
vrev.c_str(),
vproto.c_str()
};
PGresult *res = PQexec(conn, "BEGIN");
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "ERROR: Error beginning update transaction: %s\n", PQresultErrorMessage(res));
PQclear(res);
delete config;
config = nullptr;
continue;
}
res = PQexecParams(conn,
"INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3) ON CONFLICT (network_id, member_id, address) DO NOTHING",
3,
"INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, "
"identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, "
"remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) "
"VALUES ($1, $2, $3, $4, $5, $6, "
"TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), "
"$9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (network_id, id) DO UPDATE SET "
"active_bridge = EXCLUDED.active_bridge, authorized = EXCLUDED.authorized, capabilities = EXCLUDED.capabilities, "
"identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, "
"last_deauthorized_time = EXCLUDED.last_deauthorized_time, no_auto_assign_ips = EXCLUDED.no_auto_assign_ips, "
"remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, "
"revision = EXCLUDED.revision+1, tags = EXCLUDED.tags, v_major = EXCLUDED.v_major, "
"v_minor = EXCLUDED.v_minor, v_rev = EXCLUDED.v_rev, v_proto = EXCLUDED.v_proto",
17,
NULL,
v3,
values,
NULL,
NULL,
0);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "ERROR: Error setting IP addresses for member: %s\n", PQresultErrorMessage(res));
fprintf(stderr, "ERROR: Error updating member: %s\n", PQresultErrorMessage(res));
fprintf(stderr, "%s", OSUtils::jsonDump(*config, 2).c_str());
PQclear(res);
PQclear(PQexec(conn, "ROLLBACK"));
ipAssignError = true;
break;
delete config;
config = nullptr;
continue;
}
PQclear(res);
assignments.push_back(addr);
}
if (ipAssignError) {
delete config;
config = nullptr;
continue;
}
res = PQexec(conn, "COMMIT");
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "ERROR: Error committing member transaction: %s\n", PQresultErrorMessage(res));
PQclear(res);
PQclear(PQexec(conn, "ROLLBACK"));
delete config;
config = nullptr;
continue;
const char *v2[2] = {
memberId.c_str(),
networkId.c_str()
};
res = PQexecParams(conn,
"DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2",
2,
NULL,
v2,
NULL,
NULL,
0);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "ERROR: Error updating IP address assignments: %s\n", PQresultErrorMessage(res));
PQclear(res);
PQclear(PQexec(conn, "ROLLBACK"));;
delete config;
config = nullptr;
continue;
}
PQclear(res);
std::vector<std::string> assignments;
bool ipAssignError = false;
for (auto i = (*config)["ipAssignments"].begin(); i != (*config)["ipAssignments"].end(); ++i) {
std::string addr = *i;
if (std::find(assignments.begin(), assignments.end(), addr) != assignments.end()) {
continue;
}
const char *v3[3] = {
memberId.c_str(),
networkId.c_str(),
addr.c_str()
};
res = PQexecParams(conn,
"INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3) ON CONFLICT (network_id, member_id, address) DO NOTHING",
3,
NULL,
v3,
NULL,
NULL,
0);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "ERROR: Error setting IP addresses for member: %s\n", PQresultErrorMessage(res));
PQclear(res);
PQclear(PQexec(conn, "ROLLBACK"));
ipAssignError = true;
break;
}
PQclear(res);
assignments.push_back(addr);
}
if (ipAssignError) {
delete config;
config = nullptr;
continue;
}
res = PQexec(conn, "COMMIT");
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "ERROR: Error committing member transaction: %s\n", PQresultErrorMessage(res));
PQclear(res);
PQclear(PQexec(conn, "ROLLBACK"));
delete config;
config = nullptr;
continue;
}
}
const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL);
@ -1254,209 +1259,165 @@ void PostgreSQL::commitThread()
}
} else if (objtype == "network") {
try {
std::string id = (*config)["id"];
std::string controllerId = _myAddressStr.c_str();
std::string name = (*config)["name"];
std::string remoteTraceTarget("NULL");
if (!(*config)["remoteTraceTarget"].is_null()) {
remoteTraceTarget = (*config)["remoteTraceTarget"];
}
std::string rulesSource;
if ((*config)["rulesSource"].is_string()) {
rulesSource = (*config)["rulesSource"];
}
std::string caps = OSUtils::jsonDump((*config)["capabilitles"], -1);
std::string now = std::to_string(OSUtils::now());
std::string mtu = std::to_string((int)(*config)["mtu"]);
std::string mcastLimit = std::to_string((int)(*config)["multicastLimit"]);
std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]);
std::string rules = OSUtils::jsonDump((*config)["rules"], -1);
std::string tags = OSUtils::jsonDump((*config)["tags"], -1);
std::string v4mode = OSUtils::jsonDump((*config)["v4AssignMode"],-1);
std::string v6mode = OSUtils::jsonDump((*config)["v6AssignMode"], -1);
bool enableBroadcast = (*config)["enableBroadcast"];
bool isPrivate = (*config)["private"];
bool fromCentral = OSUtils::jsonBool((*config)["fromCentral"], false);
if (!fromCentral) {
std::string id = (*config)["id"];
std::string controllerId = _myAddressStr.c_str();
std::string name = (*config)["name"];
std::string remoteTraceTarget("NULL");
if (!(*config)["remoteTraceTarget"].is_null()) {
remoteTraceTarget = (*config)["remoteTraceTarget"];
}
std::string rulesSource;
if ((*config)["rulesSource"].is_string()) {
rulesSource = (*config)["rulesSource"];
}
std::string caps = OSUtils::jsonDump((*config)["capabilitles"], -1);
std::string now = std::to_string(OSUtils::now());
std::string mtu = std::to_string((int)(*config)["mtu"]);
std::string mcastLimit = std::to_string((int)(*config)["multicastLimit"]);
std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]);
std::string rules = OSUtils::jsonDump((*config)["rules"], -1);
std::string tags = OSUtils::jsonDump((*config)["tags"], -1);
std::string v4mode = OSUtils::jsonDump((*config)["v4AssignMode"],-1);
std::string v6mode = OSUtils::jsonDump((*config)["v6AssignMode"], -1);
bool enableBroadcast = (*config)["enableBroadcast"];
bool isPrivate = (*config)["private"];
const char *values[16] = {
id.c_str(),
controllerId.c_str(),
caps.c_str(),
enableBroadcast ? "true" : "false",
now.c_str(),
mtu.c_str(),
mcastLimit.c_str(),
name.c_str(),
isPrivate ? "true" : "false",
rtraceLevel.c_str(),
(remoteTraceTarget == "NULL" ? NULL : remoteTraceTarget.c_str()),
rules.c_str(),
rulesSource.c_str(),
tags.c_str(),
v4mode.c_str(),
v6mode.c_str(),
};
PGresult *res = PQexec(conn, "BEGIN");
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "ERROR: Error beginnning transaction: %s\n", PQresultErrorMessage(res));
PQclear(res);
delete config;
config = nullptr;
continue;
}
PQclear(res);
// This ugly query exists because when we want to mirror networks to/from
// another data store (e.g. FileDB or LFDB) it is possible to get a network
// that doesn't exist in Central's database. This does an upsert and sets
// the owner_id to the "first" global admin in the user DB if the record
// did not previously exist. If the record already exists owner_id is left
// unchanged, so owner_id should be left out of the update clause.
res = PQexecParams(conn,
"INSERT INTO ztc_network (id, creation_time, owner_id, controller_id, capabilities, enable_broadcast, "
"last_modified, mtu, multicast_limit, name, private, "
"remote_trace_level, remote_trace_target, rules, rules_source, "
"tags, v4_assign_mode, v6_assign_mode) VALUES ("
"$1, TO_TIMESTAMP($5::double precision/1000), "
"(SELECT user_id AS owner_id FROM ztc_global_permissions WHERE authorize = true AND del = true AND modify = true AND read = true LIMIT 1),"
"$2, $3, $4, TO_TIMESTAMP($5::double precision/1000), "
"$6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) "
"ON CONFLICT (id) DO UPDATE set controller_id = EXCLUDED.controller_id, "
"capabilities = EXCLUDED.capabilities, enable_broadcast = EXCLUDED.enable_broadcast, "
"last_modified = EXCLUDED.last_modified, mtu = EXCLUDED.mtu, "
"multicast_limit = EXCLUDED.multicast_limit, name = EXCLUDED.name, "
"private = EXCLUDED.private, remote_trace_level = EXCLUDED.remote_trace_level, "
"remote_trace_target = EXCLUDED.remote_trace_target, rules = EXCLUDED.rules, "
"rules_source = EXCLUDED.rules_source, tags = EXCLUDED.tags, "
"v4_assign_mode = EXCLUDED.v4_assign_mode, v6_assign_mode = EXCLUDED.v6_assign_mode",
16,
NULL,
values,
NULL,
NULL,
0);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "ERROR: Error updating network record: %s\n", PQresultErrorMessage(res));
PQclear(res);
PQclear(PQexec(conn, "ROLLBACK"));
delete config;
config = nullptr;
continue;
}
PQclear(res);
const char *params[1] = {
id.c_str()
};
res = PQexecParams(conn,
"DELETE FROM ztc_network_assignment_pool WHERE network_id = $1",
1,
NULL,
params,
NULL,
NULL,
0);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "ERROR: Error updating assignment pool: %s\n", PQresultErrorMessage(res));
PQclear(res);
PQclear(PQexec(conn, "ROLLBACK"));
delete config;
config = nullptr;
continue;
}
PQclear(res);
auto pool = (*config)["ipAssignmentPools"];
bool err = false;
for (auto i = pool.begin(); i != pool.end(); ++i) {
std::string start = (*i)["ipRangeStart"];
std::string end = (*i)["ipRangeEnd"];
const char *p[3] = {
const char *values[16] = {
id.c_str(),
start.c_str(),
end.c_str()
controllerId.c_str(),
caps.c_str(),
enableBroadcast ? "true" : "false",
now.c_str(),
mtu.c_str(),
mcastLimit.c_str(),
name.c_str(),
isPrivate ? "true" : "false",
rtraceLevel.c_str(),
(remoteTraceTarget == "NULL" ? NULL : remoteTraceTarget.c_str()),
rules.c_str(),
rulesSource.c_str(),
tags.c_str(),
v4mode.c_str(),
v6mode.c_str(),
};
PGresult *res = PQexec(conn, "BEGIN");
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "ERROR: Error beginnning transaction: %s\n", PQresultErrorMessage(res));
PQclear(res);
delete config;
config = nullptr;
continue;
}
PQclear(res);
// This ugly query exists because when we want to mirror networks to/from
// another data store (e.g. FileDB or LFDB) it is possible to get a network
// that doesn't exist in Central's database. This does an upsert and sets
// the owner_id to the "first" global admin in the user DB if the record
// did not previously exist. If the record already exists owner_id is left
// unchanged, so owner_id should be left out of the update clause.
res = PQexecParams(conn,
"INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) "
"VALUES ($1, $2, $3)",
3,
"INSERT INTO ztc_network (id, creation_time, owner_id, controller_id, capabilities, enable_broadcast, "
"last_modified, mtu, multicast_limit, name, private, "
"remote_trace_level, remote_trace_target, rules, rules_source, "
"tags, v4_assign_mode, v6_assign_mode) VALUES ("
"$1, TO_TIMESTAMP($5::double precision/1000), "
"(SELECT user_id AS owner_id FROM ztc_global_permissions WHERE authorize = true AND del = true AND modify = true AND read = true LIMIT 1),"
"$2, $3, $4, TO_TIMESTAMP($5::double precision/1000), "
"$6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) "
"ON CONFLICT (id) DO UPDATE set controller_id = EXCLUDED.controller_id, "
"capabilities = EXCLUDED.capabilities, enable_broadcast = EXCLUDED.enable_broadcast, "
"last_modified = EXCLUDED.last_modified, mtu = EXCLUDED.mtu, "
"multicast_limit = EXCLUDED.multicast_limit, name = EXCLUDED.name, "
"private = EXCLUDED.private, remote_trace_level = EXCLUDED.remote_trace_level, "
"remote_trace_target = EXCLUDED.remote_trace_target, rules = EXCLUDED.rules, "
"rules_source = EXCLUDED.rules_source, tags = EXCLUDED.tags, "
"v4_assign_mode = EXCLUDED.v4_assign_mode, v6_assign_mode = EXCLUDED.v6_assign_mode",
16,
NULL,
p,
values,
NULL,
NULL,
0);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "ERROR: Error updating network record: %s\n", PQresultErrorMessage(res));
PQclear(res);
PQclear(PQexec(conn, "ROLLBACK"));
delete config;
config = nullptr;
continue;
}
PQclear(res);
const char *params[1] = {
id.c_str()
};
res = PQexecParams(conn,
"DELETE FROM ztc_network_assignment_pool WHERE network_id = $1",
1,
NULL,
params,
NULL,
NULL,
0);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "ERROR: Error updating assignment pool: %s\n", PQresultErrorMessage(res));
PQclear(res);
err = true;
break;
}
PQclear(res);
}
if (err) {
PQclear(res);
PQclear(PQexec(conn, "ROLLBACK"));
delete config;
config = nullptr;
continue;
}
res = PQexecParams(conn,
"DELETE FROM ztc_network_route WHERE network_id = $1",
1,
NULL,
params,
NULL,
NULL,
0);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "ERROR: Error updating routes: %s\n", PQresultErrorMessage(res));
PQclear(res);
PQclear(PQexec(conn, "ROLLBACK"));
delete config;
config = nullptr;
continue;
}
auto routes = (*config)["routes"];
err = false;
for (auto i = routes.begin(); i != routes.end(); ++i) {
std::string t = (*i)["target"];
std::vector<std::string> target;
std::istringstream f(t);
std::string s;
while(std::getline(f, s, '/')) {
target.push_back(s);
}
if (target.empty() || target.size() != 2) {
PQclear(PQexec(conn, "ROLLBACK"));
delete config;
config = nullptr;
continue;
}
std::string targetAddr = target[0];
std::string targetBits = target[1];
std::string via = "NULL";
if (!(*i)["via"].is_null()) {
via = (*i)["via"];
PQclear(res);
auto pool = (*config)["ipAssignmentPools"];
bool err = false;
for (auto i = pool.begin(); i != pool.end(); ++i) {
std::string start = (*i)["ipRangeStart"];
std::string end = (*i)["ipRangeEnd"];
const char *p[3] = {
id.c_str(),
start.c_str(),
end.c_str()
};
res = PQexecParams(conn,
"INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) "
"VALUES ($1, $2, $3)",
3,
NULL,
p,
NULL,
NULL,
0);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "ERROR: Error updating assignment pool: %s\n", PQresultErrorMessage(res));
PQclear(res);
err = true;
break;
}
PQclear(res);
}
if (err) {
PQclear(res);
PQclear(PQexec(conn, "ROLLBACK"));
delete config;
config = nullptr;
continue;
}
const char *p[4] = {
id.c_str(),
targetAddr.c_str(),
targetBits.c_str(),
(via == "NULL" ? NULL : via.c_str()),
};
res = PQexecParams(conn,
"INSERT INTO ztc_network_route (network_id, address, bits, via) VALUES ($1, $2, $3, $4)",
4,
"DELETE FROM ztc_network_route WHERE network_id = $1",
1,
NULL,
p,
params,
NULL,
NULL,
0);
@ -1464,61 +1425,108 @@ void PostgreSQL::commitThread()
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "ERROR: Error updating routes: %s\n", PQresultErrorMessage(res));
PQclear(res);
PQclear(PQexec(conn, "ROLLBACK"));
delete config;
config = nullptr;
continue;
}
auto routes = (*config)["routes"];
err = false;
for (auto i = routes.begin(); i != routes.end(); ++i) {
std::string t = (*i)["target"];
std::vector<std::string> target;
std::istringstream f(t);
std::string s;
while(std::getline(f, s, '/')) {
target.push_back(s);
}
if (target.empty() || target.size() != 2) {
continue;
}
std::string targetAddr = target[0];
std::string targetBits = target[1];
std::string via = "NULL";
if (!(*i)["via"].is_null()) {
via = (*i)["via"];
}
const char *p[4] = {
id.c_str(),
targetAddr.c_str(),
targetBits.c_str(),
(via == "NULL" ? NULL : via.c_str()),
};
res = PQexecParams(conn,
"INSERT INTO ztc_network_route (network_id, address, bits, via) VALUES ($1, $2, $3, $4)",
4,
NULL,
p,
NULL,
NULL,
0);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "ERROR: Error updating routes: %s\n", PQresultErrorMessage(res));
PQclear(res);
err = true;
break;
}
PQclear(res);
}
if (err) {
PQclear(PQexec(conn, "ROLLBACK"));
delete config;
config = nullptr;
continue;
}
auto dns = (*config)["dns"];
std::string domain = dns["domain"];
std::stringstream servers;
servers << "{";
for (auto j = dns["servers"].begin(); j < dns["servers"].end(); ++j) {
servers << *j;
if ( (j+1) != dns["servers"].end()) {
servers << ",";
}
}
servers << "}";
const char *p[3] = {
id.c_str(),
domain.c_str(),
servers.str().c_str()
};
res = PQexecParams(conn, "INSERT INTO ztc_network_dns (network_id, domain, servers) VALUES ($1, $2, $3) ON CONFLICT (network_id) DO UPDATE SET domain = EXCLUDED.domain, servers = EXCLUDED.servers",
3,
NULL,
p,
NULL,
NULL,
0);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "ERROR: Error updating DNS: %s\n", PQresultErrorMessage(res));
PQclear(res);
PQclear(PQexec(conn, "ROLLBACK"));
err = true;
break;
}
PQclear(res);
}
if (err) {
PQclear(PQexec(conn, "ROLLBACK"));
delete config;
config = nullptr;
continue;
}
auto dns = (*config)["dns"];
std::string domain = dns["domain"];
std::stringstream servers;
servers << "{";
for (auto j = dns["servers"].begin(); j < dns["servers"].end(); ++j) {
servers << *j;
if ( (j+1) != dns["servers"].end()) {
servers << ",";
res = PQexec(conn, "COMMIT");
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "ERROR: Error committing network update: %s\n", PQresultErrorMessage(res));
PQclear(res);
PQclear(PQexec(conn, "ROLLBACK"));
delete config;
config = nullptr;
continue;
}
}
servers << "}";
const char *p[3] = {
id.c_str(),
domain.c_str(),
servers.str().c_str()
};
res = PQexecParams(conn, "INSERT INTO ztc_network_dns (network_id, domain, servers) VALUES ($1, $2, $3) ON CONFLICT (network_id) DO UPDATE SET domain = EXCLUDED.domain, servers = EXCLUDED.servers",
3,
NULL,
p,
NULL,
NULL,
0);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "ERROR: Error updating DNS: %s\n", PQresultErrorMessage(res));
PQclear(res);
PQclear(PQexec(conn, "ROLLBACK"));
err = true;
break;
}
PQclear(res);
res = PQexec(conn, "COMMIT");
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "ERROR: Error committing network update: %s\n", PQresultErrorMessage(res));
PQclear(res);
PQclear(PQexec(conn, "ROLLBACK"));
delete config;
config = nullptr;
continue;
}
PQclear(res);
const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL);
if (nwidInt) {
@ -1639,7 +1647,7 @@ void PostgreSQL::commitThread()
fprintf(stderr, "ERROR: Error getting objtype: %s\n", e.what());
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
PQfinish(conn);