diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 4e13040fe..57e09ed6a 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -1554,15 +1554,6 @@ void PostgreSQL::onlineNotificationThread() { waitForReady(); - if (_rc != NULL) { - onlineNotification_Redis(); - } else { - onlineNotification_Postgres(); - } -} - -void PostgreSQL::onlineNotification_Postgres() -{ PGconn *conn = getPgConn(); if (PQstatus(conn) == CONNECTION_BAD) { fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); @@ -1571,19 +1562,64 @@ void PostgreSQL::onlineNotification_Postgres() } _connected = 1; - nlohmann::json jtmp1, jtmp2; while (_run == 1) { + std::unordered_map< std::pair,std::pair,_PairHasher > lastOnline; + { + std::lock_guard l(_lastOnline_l); + lastOnline.swap(_lastOnline); + } + + onlineNotification_Postgres(conn, lastOnline); + + try { + if (!lastOnline.empty()) { + if (_rc->clusterMode) { + auto tx = _cluster->transaction(_myAddressStr, true); + _doRedisUpdate(tx, _myAddressStr, lastOnline); + } else { + auto tx = _redis->transaction(true); + _doRedisUpdate(tx, _myAddressStr, lastOnline); + } + } + } catch (sw::redis::Error &e) { +#ifdef ZT_TRACE + fprintf(stderr, "Error in online notification thread (redis): %s\n", e.what()); +#endif + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str()); + PQfinish(conn); + if (_run == 1) { + fprintf(stderr, "ERROR: %s onlineNotificationThread should still be running! Exiting Controller.\n", _myAddressStr.c_str()); + exit(6); + } + + // if (_rc != NULL) { + // onlineNotification_Redis(); + // } else { + // onlineNotification_Postgres(); + // } +} + +void PostgreSQL::onlineNotification_Postgres(PGconn *conn, std::unordered_map< std::pair,std::pair,_PairHasher > &lastOnline) +{ + + + nlohmann::json jtmp1, jtmp2; + // while (_run == 1) { if (PQstatus(conn) != CONNECTION_OK) { fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres."); PQfinish(conn); exit(5); } - std::unordered_map< std::pair,std::pair,_PairHasher > lastOnline; - { - std::lock_guard l(_lastOnline_l); - lastOnline.swap(_lastOnline); - } + // std::unordered_map< std::pair,std::pair,_PairHasher > lastOnline; + // { + // std::lock_guard l(_lastOnline_l); + // lastOnline.swap(_lastOnline); + // } PGresult *res = NULL; @@ -1665,14 +1701,14 @@ void PostgreSQL::onlineNotification_Postgres() PQclear(res); } - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } - fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str()); - PQfinish(conn); - if (_run == 1) { - fprintf(stderr, "ERROR: %s onlineNotificationThread should still be running! Exiting Controller.\n", _myAddressStr.c_str()); - exit(6); - } + // std::this_thread::sleep_for(std::chrono::milliseconds(10)); + // } + // fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str()); + // PQfinish(conn); + // if (_run == 1) { + // fprintf(stderr, "ERROR: %s onlineNotificationThread should still be running! Exiting Controller.\n", _myAddressStr.c_str()); + // exit(6); + // } } void PostgreSQL::onlineNotification_Redis() diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp index f61670132..061349e1e 100644 --- a/controller/PostgreSQL.hpp +++ b/controller/PostgreSQL.hpp @@ -70,7 +70,7 @@ private: void commitThread(); void onlineNotificationThread(); - void onlineNotification_Postgres(); + void onlineNotification_Postgres(PGconn *conn, std::unordered_map< std::pair,std::pair,_PairHasher > &lastOnline); void onlineNotification_Redis(); void _doRedisUpdate(sw::redis::Transaction &tx, std::string &controllerId, std::unordered_map< std::pair,std::pair,_PairHasher > &lastOnline);