update # of threads for Postgres

This commit is contained in:
Grant Limberg 2018-12-06 13:08:31 -08:00
parent 5535cad773
commit 1c86f25fab
3 changed files with 8 additions and 9 deletions

View File

@ -35,8 +35,6 @@
#include "../ext/json/json.hpp" #include "../ext/json/json.hpp"
#define ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS 2
namespace ZeroTier namespace ZeroTier
{ {

View File

@ -77,7 +77,7 @@ PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId
_heartbeatThread = std::thread(&PostgreSQL::heartbeat, this); _heartbeatThread = std::thread(&PostgreSQL::heartbeat, this);
_membersDbWatcher = std::thread(&PostgreSQL::membersDbWatcher, this); _membersDbWatcher = std::thread(&PostgreSQL::membersDbWatcher, this);
_networksDbWatcher = std::thread(&PostgreSQL::networksDbWatcher, this); _networksDbWatcher = std::thread(&PostgreSQL::networksDbWatcher, this);
for (int i = 0; i < ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS; ++i) { for (int i = 0; i < ZT_CENTRAL_CONTROLLER_COMMIT_THREADS; ++i) {
_commitThread[i] = std::thread(&PostgreSQL::commitThread, this); _commitThread[i] = std::thread(&PostgreSQL::commitThread, this);
} }
_onlineNotificationThread = std::thread(&PostgreSQL::onlineNotificationThread, this); _onlineNotificationThread = std::thread(&PostgreSQL::onlineNotificationThread, this);
@ -91,7 +91,7 @@ PostgreSQL::~PostgreSQL()
_heartbeatThread.join(); _heartbeatThread.join();
_membersDbWatcher.join(); _membersDbWatcher.join();
_networksDbWatcher.join(); _networksDbWatcher.join();
for (int i = 0; i < ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS; ++i) { for (int i = 0; i < ZT_CENTRAL_CONTROLLER_COMMIT_THREADS; ++i) {
_commitThread[i].join(); _commitThread[i].join();
} }
_onlineNotificationThread.join(); _onlineNotificationThread.join();
@ -518,8 +518,9 @@ void PostgreSQL::heartbeat()
} }
while (_run == 1) { while (_run == 1) {
if(PQstatus(conn) != CONNECTION_OK) { if(PQstatus(conn) != CONNECTION_OK) {
fprintf(stderr, "%s heartbeat thread lost connection to Database\n", _myAddressStr.c_str());
PQfinish(conn); PQfinish(conn);
conn = getPgConn(); exit(6);
} }
if (conn) { if (conn) {
std::string major = std::to_string(ZEROTIER_ONE_VERSION_MAJOR); std::string major = std::to_string(ZEROTIER_ONE_VERSION_MAJOR);
@ -1161,8 +1162,7 @@ void PostgreSQL::onlineNotificationThread()
if (PQstatus(conn) != CONNECTION_OK) { if (PQstatus(conn) != CONNECTION_OK) {
fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres."); fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres.");
PQfinish(conn); PQfinish(conn);
conn = getPgConn(); exit(5);
continue;
} }
// map used to send notifications to front end // map used to send notifications to front end
@ -1326,7 +1326,6 @@ void PostgreSQL::onlineNotificationThread()
} }
fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread", _myAddressStr.c_str()); fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread", _myAddressStr.c_str());
PQfinish(conn); PQfinish(conn);
exit(5);
} }
PGconn *PostgreSQL::getPgConn(OverrideMode m) { PGconn *PostgreSQL::getPgConn(OverrideMode m) {

View File

@ -23,6 +23,8 @@
#include "DB.hpp" #include "DB.hpp"
#define ZT_CENTRAL_CONTROLLER_COMMIT_THREADS 4
extern "C" { extern "C" {
typedef struct pg_conn PGconn; typedef struct pg_conn PGconn;
} }
@ -79,7 +81,7 @@ private:
std::thread _heartbeatThread; std::thread _heartbeatThread;
std::thread _membersDbWatcher; std::thread _membersDbWatcher;
std::thread _networksDbWatcher; std::thread _networksDbWatcher;
std::thread _commitThread[ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS]; std::thread _commitThread[ZT_CENTRAL_CONTROLLER_COMMIT_THREADS];
std::thread _onlineNotificationThread; std::thread _onlineNotificationThread;
std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > _lastOnline; std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > _lastOnline;