2018-08-31 21:58:15 +00:00
/*
2019-08-23 16:23:39 +00:00
* Copyright ( c ) 2019 ZeroTier , Inc .
2018-08-31 21:58:15 +00:00
*
2019-08-23 16:23:39 +00:00
* Use of this software is governed by the Business Source License included
* in the LICENSE . TXT file in the project ' s root directory .
2018-08-31 21:58:15 +00:00
*
2020-08-20 19:51:39 +00:00
* Change Date : 2025 - 01 - 01
2018-08-31 21:58:15 +00:00
*
2019-08-23 16:23:39 +00:00
* On the date above , in accordance with the Business Source License , use
* of this software will be governed by version 2.0 of the Apache License .
2018-08-31 21:58:15 +00:00
*/
2019-08-23 16:23:39 +00:00
/****/
2018-08-31 21:58:15 +00:00
2019-08-06 12:51:50 +00:00
# include "PostgreSQL.hpp"
2018-08-31 21:58:15 +00:00
# ifdef ZT_CONTROLLER_USE_LIBPQ
2019-08-06 12:51:50 +00:00
# include "../node/Constants.hpp"
2018-08-31 21:58:15 +00:00
# include "EmbeddedNetworkController.hpp"
# include "../version.h"
2020-05-11 22:03:56 +00:00
# include "Redis.hpp"
2018-08-31 21:58:15 +00:00
2018-11-30 18:37:27 +00:00
# include <libpq-fe.h>
2018-09-28 17:55:39 +00:00
# include <sstream>
2020-05-20 23:54:18 +00:00
# include <climits>
2020-05-11 19:29:06 +00:00
2018-08-31 21:58:15 +00:00
using json = nlohmann : : json ;
2019-08-06 12:51:50 +00:00
2018-08-31 21:58:15 +00:00
namespace {
2021-09-21 15:51:26 +00:00
static const int DB_MINIMUM_VERSION = 5 ;
2019-07-16 19:15:38 +00:00
2018-08-31 21:58:15 +00:00
static const char * _timestr ( )
{
time_t t = time ( 0 ) ;
char * ts = ctime ( & t ) ;
char * p = ts ;
if ( ! p )
return " " ;
while ( * p ) {
if ( * p = = ' \n ' ) {
* p = ( char ) 0 ;
break ;
}
+ + p ;
}
return ts ;
}
2019-08-06 16:00:35 +00:00
/*
2018-09-06 22:14:16 +00:00
std : : string join ( const std : : vector < std : : string > & elements , const char * const separator )
{
switch ( elements . size ( ) ) {
case 0 :
return " " ;
case 1 :
return elements [ 0 ] ;
default :
std : : ostringstream os ;
std : : copy ( elements . begin ( ) , elements . end ( ) - 1 , std : : ostream_iterator < std : : string > ( os , separator ) ) ;
os < < * elements . rbegin ( ) ;
return os . str ( ) ;
}
}
2019-08-06 16:00:35 +00:00
*/
2018-09-06 22:14:16 +00:00
2019-08-06 12:51:50 +00:00
} // anonymous namespace
2018-08-31 21:58:15 +00:00
using namespace ZeroTier ;
2020-05-12 18:56:19 +00:00
using Attrs = std : : vector < std : : pair < std : : string , std : : string > > ;
using Item = std : : pair < std : : string , Attrs > ;
using ItemStream = std : : vector < Item > ;
2020-05-11 22:03:56 +00:00
PostgreSQL : : PostgreSQL ( const Identity & myId , const char * path , int listenPort , RedisConfig * rc )
2019-08-06 15:42:54 +00:00
: DB ( )
, _myId ( myId )
, _myAddress ( myId . address ( ) )
2019-08-06 12:51:50 +00:00
, _ready ( 0 )
2018-08-31 21:58:15 +00:00
, _connected ( 1 )
2019-08-06 12:51:50 +00:00
, _run ( 1 )
, _waitNoticePrinted ( false )
2019-01-21 19:18:20 +00:00
, _listenPort ( listenPort )
2020-05-11 22:03:56 +00:00
, _rc ( rc )
2020-05-11 23:02:49 +00:00
, _redis ( NULL )
, _cluster ( NULL )
2018-08-31 21:58:15 +00:00
{
2019-08-06 15:42:54 +00:00
char myAddress [ 64 ] ;
_myAddressStr = myId . address ( ) . toString ( myAddress ) ;
2021-09-21 15:51:26 +00:00
_connString = std : : string ( path ) + " application_name=controller_ " + _myAddressStr ;
// Database Schema Version Check
PGconn * conn = getPgConn ( ) ;
if ( PQstatus ( conn ) ! = CONNECTION_OK ) {
fprintf ( stderr , " Bad Database Connection: %s " , PQerrorMessage ( conn ) ) ;
exit ( 1 ) ;
2019-07-16 19:15:38 +00:00
}
2021-09-21 15:51:26 +00:00
PGresult * res = PQexec ( conn , " SELECT version FROM ztc_database " ) ;
if ( PQresultStatus ( res ) ! = PGRES_TUPLES_OK ) {
fprintf ( stderr , " Error determining database version " ) ;
exit ( 1 ) ;
}
2019-07-16 19:15:38 +00:00
2021-09-21 15:51:26 +00:00
if ( PQntuples ( res ) ! = 1 ) {
fprintf ( stderr , " Invalid number of db version tuples returned. " ) ;
exit ( 1 ) ;
}
int dbVersion = std : : stoi ( PQgetvalue ( res , 0 , 0 ) ) ;
2019-07-16 19:15:38 +00:00
if ( dbVersion < DB_MINIMUM_VERSION ) {
fprintf ( stderr , " Central database schema version too low. This controller version requires a minimum schema version of %d. Please upgrade your Central instance " , DB_MINIMUM_VERSION ) ;
exit ( 1 ) ;
}
2021-09-21 15:51:26 +00:00
PQclear ( res ) ;
res = NULL ;
2019-07-16 19:15:38 +00:00
2020-05-11 23:02:49 +00:00
if ( _rc ! = NULL ) {
sw : : redis : : ConnectionOptions opts ;
sw : : redis : : ConnectionPoolOptions poolOpts ;
opts . host = _rc - > hostname ;
opts . port = _rc - > port ;
opts . password = _rc - > password ;
opts . db = 0 ;
poolOpts . size = 10 ;
if ( _rc - > clusterMode ) {
2020-05-13 16:46:41 +00:00
fprintf ( stderr , " Using Redis in Cluster Mode \n " ) ;
2020-05-12 18:56:19 +00:00
_cluster = std : : make_shared < sw : : redis : : RedisCluster > ( opts , poolOpts ) ;
2020-05-11 23:02:49 +00:00
} else {
2020-05-13 16:46:41 +00:00
fprintf ( stderr , " Using Redis in Standalone Mode \n " ) ;
2020-05-12 18:56:19 +00:00
_redis = std : : make_shared < sw : : redis : : Redis > ( opts , poolOpts ) ;
2020-05-11 23:02:49 +00:00
}
}
2018-08-31 21:58:15 +00:00
_readyLock . lock ( ) ;
2020-05-20 23:28:28 +00:00
fprintf ( stderr , " [%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download... " ZT_EOL_S , : : _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) ) ;
_waitNoticePrinted = true ;
2021-09-21 15:51:26 +00:00
initializeNetworks ( conn ) ;
initializeMembers ( conn ) ;
PQfinish ( conn ) ;
conn = NULL ;
2020-05-20 23:28:28 +00:00
2018-08-31 21:58:15 +00:00
_heartbeatThread = std : : thread ( & PostgreSQL : : heartbeat , this ) ;
_membersDbWatcher = std : : thread ( & PostgreSQL : : membersDbWatcher , this ) ;
_networksDbWatcher = std : : thread ( & PostgreSQL : : networksDbWatcher , this ) ;
2018-12-06 21:08:31 +00:00
for ( int i = 0 ; i < ZT_CENTRAL_CONTROLLER_COMMIT_THREADS ; + + i ) {
2018-08-31 21:58:15 +00:00
_commitThread [ i ] = std : : thread ( & PostgreSQL : : commitThread , this ) ;
}
_onlineNotificationThread = std : : thread ( & PostgreSQL : : onlineNotificationThread , this ) ;
}
PostgreSQL : : ~ PostgreSQL ( )
{
_run = 0 ;
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 100 ) ) ;
2019-08-23 16:23:39 +00:00
2018-08-31 21:58:15 +00:00
_heartbeatThread . join ( ) ;
_membersDbWatcher . join ( ) ;
_networksDbWatcher . join ( ) ;
2020-05-12 18:56:19 +00:00
_commitQueue . stop ( ) ;
2018-12-06 21:08:31 +00:00
for ( int i = 0 ; i < ZT_CENTRAL_CONTROLLER_COMMIT_THREADS ; + + i ) {
2018-08-31 21:58:15 +00:00
_commitThread [ i ] . join ( ) ;
}
_onlineNotificationThread . join ( ) ;
}
bool PostgreSQL : : waitForReady ( )
{
while ( _ready < 2 ) {
_readyLock . lock ( ) ;
_readyLock . unlock ( ) ;
}
return true ;
}
bool PostgreSQL : : isReady ( )
{
return ( ( _ready = = 2 ) & & ( _connected ) ) ;
}
2019-08-06 15:42:54 +00:00
bool PostgreSQL : : save ( nlohmann : : json & record , bool notifyListeners )
2018-08-31 21:58:15 +00:00
{
2019-08-06 15:42:54 +00:00
bool modified = false ;
2018-10-24 19:06:17 +00:00
try {
2021-09-21 15:51:26 +00:00
if ( ! record . is_object ( ) )
2019-08-06 16:00:35 +00:00
return false ;
2019-08-06 13:51:23 +00:00
const std : : string objtype = record [ " objtype " ] ;
if ( objtype = = " network " ) {
const uint64_t nwid = OSUtils : : jsonIntHex ( record [ " id " ] , 0ULL ) ;
if ( nwid ) {
nlohmann : : json old ;
get ( nwid , old ) ;
2019-08-08 20:29:13 +00:00
if ( ( ! old . is_object ( ) ) | | ( ! _compareRecords ( old , record ) ) ) {
2019-08-06 13:51:23 +00:00
record [ " revision " ] = OSUtils : : jsonInt ( record [ " revision " ] , 0ULL ) + 1ULL ;
2019-08-06 15:42:54 +00:00
_commitQueue . post ( std : : pair < nlohmann : : json , bool > ( record , notifyListeners ) ) ;
modified = true ;
2019-08-06 13:51:23 +00:00
}
}
} else if ( objtype = = " member " ) {
const uint64_t nwid = OSUtils : : jsonIntHex ( record [ " nwid " ] , 0ULL ) ;
const uint64_t id = OSUtils : : jsonIntHex ( record [ " id " ] , 0ULL ) ;
if ( ( id ) & & ( nwid ) ) {
nlohmann : : json network , old ;
get ( nwid , network , id , old ) ;
2019-08-08 20:29:13 +00:00
if ( ( ! old . is_object ( ) ) | | ( ! _compareRecords ( old , record ) ) ) {
2019-08-06 13:51:23 +00:00
record [ " revision " ] = OSUtils : : jsonInt ( record [ " revision " ] , 0ULL ) + 1ULL ;
2019-08-06 15:42:54 +00:00
_commitQueue . post ( std : : pair < nlohmann : : json , bool > ( record , notifyListeners ) ) ;
modified = true ;
2019-08-06 13:51:23 +00:00
}
}
2018-10-24 19:06:17 +00:00
}
} catch ( std : : exception & e ) {
fprintf ( stderr , " Error on PostgreSQL::save: %s \n " , e . what ( ) ) ;
} catch ( . . . ) {
fprintf ( stderr , " Unknown error on PostgreSQL::save \n " ) ;
2018-08-31 21:58:15 +00:00
}
2019-08-06 15:42:54 +00:00
return modified ;
2018-08-31 21:58:15 +00:00
}
void PostgreSQL : : eraseNetwork ( const uint64_t networkId )
{
char tmp2 [ 24 ] ;
waitForReady ( ) ;
Utils : : hex ( networkId , tmp2 ) ;
2019-08-06 15:42:54 +00:00
std : : pair < nlohmann : : json , bool > tmp ;
tmp . first [ " id " ] = tmp2 ;
tmp . first [ " objtype " ] = " _delete_network " ;
tmp . second = true ;
2018-08-31 21:58:15 +00:00
_commitQueue . post ( tmp ) ;
2020-05-14 00:23:27 +00:00
nlohmann : : json nullJson ;
_networkChanged ( tmp . first , nullJson , true ) ;
2018-08-31 21:58:15 +00:00
}
2019-08-23 16:23:39 +00:00
void PostgreSQL : : eraseMember ( const uint64_t networkId , const uint64_t memberId )
2018-08-31 21:58:15 +00:00
{
char tmp2 [ 24 ] ;
2020-05-20 23:28:28 +00:00
waitForReady ( ) ;
2020-05-14 00:23:27 +00:00
std : : pair < nlohmann : : json , bool > tmp , nw ;
2018-08-31 21:58:15 +00:00
Utils : : hex ( networkId , tmp2 ) ;
2019-08-06 15:42:54 +00:00
tmp . first [ " nwid " ] = tmp2 ;
2018-08-31 21:58:15 +00:00
Utils : : hex ( memberId , tmp2 ) ;
2019-08-06 15:42:54 +00:00
tmp . first [ " id " ] = tmp2 ;
tmp . first [ " objtype " ] = " _delete_member " ;
tmp . second = true ;
2018-08-31 21:58:15 +00:00
_commitQueue . post ( tmp ) ;
2020-05-14 00:23:27 +00:00
nlohmann : : json nullJson ;
_memberChanged ( tmp . first , nullJson , true ) ;
2018-08-31 21:58:15 +00:00
}
void PostgreSQL : : nodeIsOnline ( const uint64_t networkId , const uint64_t memberId , const InetAddress & physicalAddress )
{
2019-08-06 15:42:54 +00:00
std : : lock_guard < std : : mutex > l ( _lastOnline_l ) ;
std : : pair < int64_t , InetAddress > & i = _lastOnline [ std : : pair < uint64_t , uint64_t > ( networkId , memberId ) ] ;
i . first = OSUtils : : now ( ) ;
if ( physicalAddress ) {
i . second = physicalAddress ;
2018-08-31 21:58:15 +00:00
}
}
2021-09-21 15:51:26 +00:00
void PostgreSQL : : initializeNetworks ( PGconn * conn )
2018-08-31 21:58:15 +00:00
{
2018-09-05 18:49:07 +00:00
try {
2021-09-21 15:51:26 +00:00
if ( PQstatus ( conn ) ! = CONNECTION_OK ) {
fprintf ( stderr , " Bad Database Connection: %s " , PQerrorMessage ( conn ) ) ;
exit ( 1 ) ;
}
std : : string setKey = " networks:{ " + _myAddressStr + " } " ;
2018-09-04 21:00:02 +00:00
2021-09-21 15:51:26 +00:00
// if (_rc != NULL) {
// try {
// if (_rc->clusterMode) {
// _cluster->del(setKey);
// } else {
// _redis->del(setKey);
// }
// } catch (sw::redis::Error &e) {
// // del can throw an error if the key doesn't exist
// // swallow it and move along
// }
// }
2020-05-20 23:28:28 +00:00
2021-09-21 15:51:26 +00:00
std : : unordered_set < std : : string > networkSet ;
const char * params [ 1 ] = {
_myAddressStr . c_str ( )
} ;
fprintf ( stderr , " Initializing Networks... \n " ) ;
PGresult * res = PQexecParams ( conn , " SELECT id, EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000, capabilities, "
" enable_broadcast, EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000, mtu, multicast_limit, name, private, remote_trace_level, "
" remote_trace_target, revision, rules, tags, v4_assign_mode, v6_assign_mode FROM ztc_network "
" WHERE deleted = false AND controller_id = $1 " ,
1 ,
NULL ,
params ,
NULL ,
NULL ,
0 ) ;
if ( PQresultStatus ( res ) ! = PGRES_TUPLES_OK ) {
fprintf ( stderr , " Networks Initialization Failed: %s " , PQerrorMessage ( conn ) ) ;
PQclear ( res ) ;
exit ( 1 ) ;
}
int numRows = PQntuples ( res ) ;
for ( int i = 0 ; i < numRows ; + + i ) {
json empty ;
json config ;
const char * nwidparam [ 1 ] = {
PQgetvalue ( res , i , 0 )
} ;
std : : string nwid = PQgetvalue ( res , i , 0 ) ;
networkSet . insert ( nwid ) ;
config [ " id " ] = nwid ;
config [ " nwid " ] = nwid ;
try {
config [ " creationTime " ] = std : : stoull ( PQgetvalue ( res , i , 1 ) ) ;
} catch ( std : : exception & e ) {
config [ " creationTime " ] = 0ULL ;
//fprintf(stderr, "Error converting creation time: %s\n", PQgetvalue(res, i, 1));
}
config [ " capabilities " ] = json : : parse ( PQgetvalue ( res , i , 2 ) ) ;
config [ " enableBroadcast " ] = ( strcmp ( PQgetvalue ( res , i , 3 ) , " t " ) = = 0 ) ;
try {
config [ " lastModified " ] = std : : stoull ( PQgetvalue ( res , i , 4 ) ) ;
} catch ( std : : exception & e ) {
config [ " lastModified " ] = 0ULL ;
//fprintf(stderr, "Error converting last modified: %s\n", PQgetvalue(res, i, 4));
2018-11-12 05:08:51 +00:00
}
2021-09-21 15:51:26 +00:00
try {
config [ " mtu " ] = std : : stoi ( PQgetvalue ( res , i , 5 ) ) ;
} catch ( std : : exception & e ) {
config [ " mtu " ] = 2800 ;
}
try {
config [ " multicastLimit " ] = std : : stoi ( PQgetvalue ( res , i , 6 ) ) ;
} catch ( std : : exception & e ) {
config [ " multicastLimit " ] = 64 ;
}
config [ " name " ] = PQgetvalue ( res , i , 7 ) ;
config [ " private " ] = ( strcmp ( PQgetvalue ( res , i , 8 ) , " t " ) = = 0 ) ;
try {
config [ " remoteTraceLevel " ] = std : : stoi ( PQgetvalue ( res , i , 9 ) ) ;
} catch ( std : : exception & e ) {
config [ " remoteTraceLevel " ] = 0 ;
}
config [ " remoteTraceTarget " ] = PQgetvalue ( res , i , 10 ) ;
try {
config [ " revision " ] = std : : stoull ( PQgetvalue ( res , i , 11 ) ) ;
} catch ( std : : exception & e ) {
config [ " revision " ] = 0ULL ;
//fprintf(stderr, "Error converting revision: %s\n", PQgetvalue(res, i, 11));
}
config [ " rules " ] = json : : parse ( PQgetvalue ( res , i , 12 ) ) ;
config [ " tags " ] = json : : parse ( PQgetvalue ( res , i , 13 ) ) ;
config [ " v4AssignMode " ] = json : : parse ( PQgetvalue ( res , i , 14 ) ) ;
config [ " v6AssignMode " ] = json : : parse ( PQgetvalue ( res , i , 15 ) ) ;
config [ " objtype " ] = " network " ;
config [ " ipAssignmentPools " ] = json : : array ( ) ;
config [ " routes " ] = json : : array ( ) ;
2018-09-05 18:49:07 +00:00
2021-09-21 15:51:26 +00:00
PGresult * r2 = PQexecParams ( conn ,
" SELECT host(ip_range_start), host(ip_range_end) FROM ztc_network_assignment_pool WHERE network_id = $1 " ,
1 ,
NULL ,
nwidparam ,
NULL ,
NULL ,
0 ) ;
if ( PQresultStatus ( r2 ) ! = PGRES_TUPLES_OK ) {
fprintf ( stderr , " ERROR: Error retreiving IP pools for network: %s \n " , PQresultErrorMessage ( r2 ) ) ;
PQclear ( r2 ) ;
PQclear ( res ) ;
exit ( 1 ) ;
2018-11-30 18:37:27 +00:00
}
2021-09-21 15:51:26 +00:00
int n = PQntuples ( r2 ) ;
for ( int j = 0 ; j < n ; + + j ) {
json ip ;
ip [ " ipRangeStart " ] = PQgetvalue ( r2 , j , 0 ) ;
ip [ " ipRangeEnd " ] = PQgetvalue ( r2 , j , 1 ) ;
2018-11-30 18:37:27 +00:00
2021-09-21 15:51:26 +00:00
config [ " ipAssignmentPools " ] . push_back ( ip ) ;
}
2018-09-04 23:05:34 +00:00
2021-09-21 15:51:26 +00:00
PQclear ( r2 ) ;
r2 = PQexecParams ( conn ,
" SELECT host(address), bits, host(via) FROM ztc_network_route WHERE network_id = $1 " ,
1 ,
NULL ,
nwidparam ,
NULL ,
NULL ,
0 ) ;
if ( PQresultStatus ( r2 ) ! = PGRES_TUPLES_OK ) {
fprintf ( stderr , " ERROR: Error retreiving routes for network: %s \n " , PQresultErrorMessage ( r2 ) ) ;
PQclear ( r2 ) ;
PQclear ( res ) ;
exit ( 1 ) ;
}
2018-11-30 18:37:27 +00:00
2021-09-21 15:51:26 +00:00
n = PQntuples ( r2 ) ;
for ( int j = 0 ; j < n ; + + j ) {
std : : string addr = PQgetvalue ( r2 , j , 0 ) ;
std : : string bits = PQgetvalue ( r2 , j , 1 ) ;
std : : string via = PQgetvalue ( r2 , j , 2 ) ;
json route ;
route [ " target " ] = addr + " / " + bits ;
2018-11-30 18:37:27 +00:00
2021-09-21 15:51:26 +00:00
if ( via = = " NULL " ) {
route [ " via " ] = nullptr ;
} else {
route [ " via " ] = via ;
}
config [ " routes " ] . push_back ( route ) ;
}
2018-09-04 23:05:34 +00:00
2021-09-21 15:51:26 +00:00
r2 = PQexecParams ( conn ,
" SELECT domain, servers FROM ztc_network_dns WHERE network_id = $1 " ,
1 ,
NULL ,
nwidparam ,
NULL ,
NULL ,
0 ) ;
2020-08-04 16:45:45 +00:00
2021-09-21 15:51:26 +00:00
if ( PQresultStatus ( r2 ) ! = PGRES_TUPLES_OK ) {
fprintf ( stderr , " ERROR: Error retrieving DNS settings for network: %s \n " , PQresultErrorMessage ( r2 ) ) ;
PQclear ( r2 ) ;
PQclear ( res ) ;
exit ( 1 ) ;
}
n = PQntuples ( r2 ) ;
if ( n > 1 ) {
fprintf ( stderr , " ERROR: invalid number of DNS configurations for network %s. Must be 0 or 1 \n " , nwid . c_str ( ) ) ;
} else if ( n = = 1 ) {
2020-08-04 16:45:45 +00:00
json obj ;
2021-09-21 15:51:26 +00:00
std : : string domain = PQgetvalue ( r2 , 0 , 0 ) ;
std : : string serverList = PQgetvalue ( r2 , 0 , 1 ) ;
2020-08-04 16:45:45 +00:00
auto servers = json : : array ( ) ;
if ( serverList . rfind ( " { " , 0 ) ! = std : : string : : npos ) {
serverList = serverList . substr ( 1 , serverList . size ( ) - 2 ) ;
std : : stringstream ss ( serverList ) ;
while ( ss . good ( ) ) {
std : : string server ;
std : : getline ( ss , server , ' , ' ) ;
servers . push_back ( server ) ;
}
}
2021-09-21 15:51:26 +00:00
obj [ " domain " ] = domain ;
2020-08-04 16:45:45 +00:00
obj [ " servers " ] = servers ;
2020-08-12 20:08:47 +00:00
config [ " dns " ] = obj ;
2020-08-04 16:45:45 +00:00
}
2021-09-21 15:51:26 +00:00
PQclear ( r2 ) ;
2019-08-23 16:23:39 +00:00
2021-09-21 15:51:26 +00:00
_networkChanged ( empty , config , false ) ;
2018-09-05 18:49:07 +00:00
}
2018-09-04 21:00:02 +00:00
2021-09-21 15:51:26 +00:00
PQclear ( res ) ;
2018-08-31 21:58:15 +00:00
2021-09-21 15:51:26 +00:00
// if(!networkSet.empty()) {
// if (_rc && _rc->clusterMode) {
// auto tx = _cluster->transaction(_myAddressStr, true);
// tx.sadd(setKey, networkSet.begin(), networkSet.end());
// tx.exec();
// } else if (_rc && !_rc->clusterMode) {
// auto tx = _redis->transaction(true);
// tx.sadd(setKey, networkSet.begin(), networkSet.end());
// tx.exec();
// }
// }
2020-05-27 01:54:27 +00:00
2018-09-05 18:49:07 +00:00
if ( + + this - > _ready = = 2 ) {
if ( _waitNoticePrinted ) {
fprintf ( stderr , " [%s] NOTICE: %.10llx controller PostgreSQL data download complete. " ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) ) ;
}
_readyLock . unlock ( ) ;
2018-08-31 21:58:15 +00:00
}
2020-05-20 23:28:28 +00:00
} catch ( sw : : redis : : Error & e ) {
fprintf ( stderr , " ERROR: Error initializing networks in Redis: %s \n " , e . what ( ) ) ;
exit ( - 1 ) ;
2018-09-05 18:49:07 +00:00
} catch ( std : : exception & e ) {
2020-05-20 23:28:28 +00:00
fprintf ( stderr , " ERROR: Error initializing networks: %s \n " , e . what ( ) ) ;
2018-09-05 18:49:07 +00:00
exit ( - 1 ) ;
2018-08-31 21:58:15 +00:00
}
}
2021-09-21 15:51:26 +00:00
void PostgreSQL : : initializeMembers ( PGconn * conn )
2018-08-31 21:58:15 +00:00
{
2018-09-05 18:49:07 +00:00
try {
2021-09-21 15:51:26 +00:00
if ( PQstatus ( conn ) ! = CONNECTION_OK ) {
fprintf ( stderr , " Bad Database Connection: %s " , PQerrorMessage ( conn ) ) ;
exit ( 1 ) ;
}
// std::string setKeyBase = "network-nodes-all:{" + _myAddressStr + "}:";
// if (_rc != NULL) {
// std::lock_guard<std::mutex> l(_networks_l);
// std::unordered_set<std::string> deletes;
// for ( auto it : _networks) {
// uint64_t nwid_i = it.first;
// char nwidTmp[64] = {0};
// OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i);
// std::string nwid(nwidTmp);
// std::string key = setKeyBase + nwid;
// deletes.insert(key);
// }
// if (!deletes.empty()) {
// if (_rc->clusterMode) {
// auto tx = _cluster->transaction(_myAddressStr, true);
// for (std::string k : deletes) {
// tx.del(k);
// }
// tx.exec();
// } else {
// auto tx = _redis->transaction(true);
// for (std::string k : deletes) {
// tx.del(k);
// }
// tx.exec();
// }
// }
// }
const char * params [ 1 ] = {
_myAddressStr . c_str ( )
} ;
2020-05-27 01:54:27 +00:00
std : : unordered_map < std : : string , std : : string > networkMembers ;
2021-09-21 15:20:15 +00:00
2021-09-21 15:51:26 +00:00
fprintf ( stderr , " Initializing Members... \n " ) ;
PGresult * res = PQexecParams ( conn ,
" SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000, m.identity, "
" EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000, "
" EXTRACT(EPOCH FROM m.last_deauthorized_time AT TIME ZONE 'UTC')*1000, "
2018-11-30 18:37:27 +00:00
" m.remote_trace_level, m.remote_trace_target, m.tags, m.v_major, m.v_minor, m.v_rev, m.v_proto, "
2021-09-21 15:51:26 +00:00
" m.no_auto_assign_ips, m.revision "
2018-09-05 18:49:07 +00:00
" FROM ztc_member m "
" INNER JOIN ztc_network n "
" ON n.id = m.network_id "
2021-09-21 15:51:26 +00:00
" WHERE n.controller_id = $1 AND m.deleted = false " ,
1 ,
NULL ,
params ,
NULL ,
NULL ,
0 ) ;
if ( PQresultStatus ( res ) ! = PGRES_TUPLES_OK ) {
fprintf ( stderr , " Member Initialization Failed: %s " , PQerrorMessage ( conn ) ) ;
PQclear ( res ) ;
exit ( 1 ) ;
}
int numRows = PQntuples ( res ) ;
for ( int i = 0 ; i < numRows ; + + i ) {
2018-09-05 18:49:07 +00:00
json empty ;
json config ;
2021-09-21 15:51:26 +00:00
std : : string memberId ( PQgetvalue ( res , i , 0 ) ) ;
std : : string networkId ( PQgetvalue ( res , i , 1 ) ) ;
// networkMembers.insert(std::pair<std::string, std::string>(setKeyBase+networkId, memberId));
std : : string ctime = PQgetvalue ( res , i , 5 ) ;
2018-09-05 18:49:07 +00:00
config [ " id " ] = memberId ;
config [ " nwid " ] = networkId ;
2021-09-21 15:51:26 +00:00
config [ " activeBridge " ] = ( strcmp ( PQgetvalue ( res , i , 2 ) , " t " ) = = 0 ) ;
config [ " authorized " ] = ( strcmp ( PQgetvalue ( res , i , 3 ) , " t " ) = = 0 ) ;
try {
config [ " capabilities " ] = json : : parse ( PQgetvalue ( res , i , 4 ) ) ;
} catch ( std : : exception & e ) {
config [ " capabilities " ] = json : : array ( ) ;
}
try {
config [ " creationTime " ] = std : : stoull ( PQgetvalue ( res , i , 5 ) ) ;
} catch ( std : : exception & e ) {
config [ " creationTime " ] = 0ULL ;
//fprintf(stderr, "Error upding creation time (member): %s\n", PQgetvalue(res, i, 5));
}
config [ " identity " ] = PQgetvalue ( res , i , 6 ) ;
try {
config [ " lastAuthorizedTime " ] = std : : stoull ( PQgetvalue ( res , i , 7 ) ) ;
} catch ( std : : exception & e ) {
config [ " lastAuthorizedTime " ] = 0ULL ;
//fprintf(stderr, "Error updating last auth time (member): %s\n", PQgetvalue(res, i, 7));
}
try {
config [ " lastDeauthorizedTime " ] = std : : stoull ( PQgetvalue ( res , i , 8 ) ) ;
} catch ( std : : exception & e ) {
config [ " lastDeauthorizedTime " ] = 0ULL ;
//fprintf(stderr, "Error updating last deauth time (member): %s\n", PQgetvalue(res, i, 8));
}
try {
config [ " remoteTraceLevel " ] = std : : stoi ( PQgetvalue ( res , i , 9 ) ) ;
} catch ( std : : exception & e ) {
config [ " remoteTraceLevel " ] = 0 ;
}
config [ " remoteTraceTarget " ] = PQgetvalue ( res , i , 10 ) ;
try {
config [ " tags " ] = json : : parse ( PQgetvalue ( res , i , 11 ) ) ;
} catch ( std : : exception & e ) {
config [ " tags " ] = json : : array ( ) ;
}
try {
config [ " vMajor " ] = std : : stoi ( PQgetvalue ( res , i , 12 ) ) ;
} catch ( std : : exception & e ) {
config [ " vMajor " ] = - 1 ;
}
try {
config [ " vMinor " ] = std : : stoi ( PQgetvalue ( res , i , 13 ) ) ;
} catch ( std : : exception & e ) {
config [ " vMinor " ] = - 1 ;
}
try {
config [ " vRev " ] = std : : stoi ( PQgetvalue ( res , i , 14 ) ) ;
} catch ( std : : exception & e ) {
config [ " vRev " ] = - 1 ;
}
try {
config [ " vProto " ] = std : : stoi ( PQgetvalue ( res , i , 15 ) ) ;
} catch ( std : : exception & e ) {
config [ " vProto " ] = - 1 ;
}
config [ " noAutoAssignIps " ] = ( strcmp ( PQgetvalue ( res , i , 16 ) , " t " ) = = 0 ) ;
try {
config [ " revision " ] = std : : stoull ( PQgetvalue ( res , i , 17 ) ) ;
} catch ( std : : exception & e ) {
config [ " revision " ] = 0ULL ;
//fprintf(stderr, "Error updating revision (member): %s\n", PQgetvalue(res, i, 17));
}
2018-09-05 18:49:07 +00:00
config [ " objtype " ] = " member " ;
config [ " ipAssignments " ] = json : : array ( ) ;
2021-09-21 15:51:26 +00:00
const char * p2 [ 2 ] = {
memberId . c_str ( ) ,
networkId . c_str ( )
} ;
PGresult * r2 = PQexecParams ( conn ,
" SELECT DISTINCT address FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2 " ,
2 ,
NULL ,
p2 ,
NULL ,
NULL ,
0 ) ;
if ( PQresultStatus ( r2 ) ! = PGRES_TUPLES_OK ) {
fprintf ( stderr , " Member Initialization Failed: %s " , PQerrorMessage ( conn ) ) ;
PQclear ( r2 ) ;
PQclear ( res ) ;
exit ( 1 ) ;
}
2018-09-05 18:49:07 +00:00
2021-09-21 15:51:26 +00:00
int n = PQntuples ( r2 ) ;
for ( int j = 0 ; j < n ; + + j ) {
std : : string ipaddr = PQgetvalue ( r2 , j , 0 ) ;
std : : size_t pos = ipaddr . find ( ' / ' ) ;
if ( pos ! = std : : string : : npos ) {
ipaddr = ipaddr . substr ( 0 , pos ) ;
2020-03-04 07:52:53 +00:00
}
2021-09-21 15:51:26 +00:00
config [ " ipAssignments " ] . push_back ( ipaddr ) ;
2018-09-05 18:49:07 +00:00
}
2018-08-31 21:58:15 +00:00
2018-09-05 18:49:07 +00:00
_memberChanged ( empty , config , false ) ;
2018-08-31 21:58:15 +00:00
}
2018-11-30 18:37:27 +00:00
2021-09-21 15:51:26 +00:00
PQclear ( res ) ;
// if (!networkMembers.empty()) {
// if (_rc != NULL) {
// if (_rc->clusterMode) {
// auto tx = _cluster->transaction(_myAddressStr, true);
// for (auto it : networkMembers) {
// tx.sadd(it.first, it.second);
// }
// tx.exec();
// } else {
// auto tx = _redis->transaction(true);
// for (auto it : networkMembers) {
// tx.sadd(it.first, it.second);
// }
// tx.exec();
// }
// }
// }
2018-09-05 18:49:07 +00:00
if ( + + this - > _ready = = 2 ) {
if ( _waitNoticePrinted ) {
fprintf ( stderr , " [%s] NOTICE: %.10llx controller PostgreSQL data download complete. " ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) ) ;
}
_readyLock . unlock ( ) ;
}
2020-05-20 23:28:28 +00:00
} catch ( sw : : redis : : Error & e ) {
fprintf ( stderr , " ERROR: Error initializing members (redis): %s \n " , e . what ( ) ) ;
2018-09-05 18:49:07 +00:00
} catch ( std : : exception & e ) {
2021-09-21 15:51:26 +00:00
fprintf ( stderr , " ERROR: Error initializing members: %s \n " , e . what ( ) ) ;
2018-09-05 18:49:07 +00:00
exit ( - 1 ) ;
2018-08-31 21:58:15 +00:00
}
}
void PostgreSQL : : heartbeat ( )
{
char publicId [ 1024 ] ;
char hostnameTmp [ 1024 ] ;
_myId . toString ( false , publicId ) ;
if ( gethostname ( hostnameTmp , sizeof ( hostnameTmp ) ) ! = 0 ) {
hostnameTmp [ 0 ] = ( char ) 0 ;
} else {
2019-08-08 16:04:11 +00:00
for ( int i = 0 ; i < ( int ) sizeof ( hostnameTmp ) ; + + i ) {
2018-08-31 21:58:15 +00:00
if ( ( hostnameTmp [ i ] = = ' . ' ) | | ( hostnameTmp [ i ] = = 0 ) ) {
hostnameTmp [ i ] = ( char ) 0 ;
break ;
}
}
}
const char * controllerId = _myAddressStr . c_str ( ) ;
const char * publicIdentity = publicId ;
const char * hostname = hostnameTmp ;
2021-09-21 15:51:26 +00:00
PGconn * conn = getPgConn ( ) ;
if ( PQstatus ( conn ) = = CONNECTION_BAD ) {
fprintf ( stderr , " Connection to database failed: %s \n " , PQerrorMessage ( conn ) ) ;
PQfinish ( conn ) ;
exit ( 1 ) ;
}
2018-08-31 21:58:15 +00:00
while ( _run = = 1 ) {
2021-09-21 15:51:26 +00:00
if ( PQstatus ( conn ) ! = CONNECTION_OK ) {
fprintf ( stderr , " %s heartbeat thread lost connection to Database \n " , _myAddressStr . c_str ( ) ) ;
PQfinish ( conn ) ;
exit ( 6 ) ;
}
2020-05-22 21:16:04 +00:00
int64_t ts = OSUtils : : now ( ) ;
2021-09-21 15:51:26 +00:00
if ( conn ) {
2018-11-30 18:37:27 +00:00
std : : string major = std : : to_string ( ZEROTIER_ONE_VERSION_MAJOR ) ;
std : : string minor = std : : to_string ( ZEROTIER_ONE_VERSION_MINOR ) ;
std : : string rev = std : : to_string ( ZEROTIER_ONE_VERSION_REVISION ) ;
std : : string build = std : : to_string ( ZEROTIER_ONE_VERSION_BUILD ) ;
2020-05-22 21:16:04 +00:00
std : : string now = std : : to_string ( ts ) ;
2019-01-21 19:29:13 +00:00
std : : string host_port = std : : to_string ( _listenPort ) ;
2021-09-21 15:51:26 +00:00
std : : string use_redis = " false " ; // (_rc != NULL) ? "true" : "false";
const char * values [ 10 ] = {
controllerId ,
hostname ,
now . c_str ( ) ,
publicIdentity ,
major . c_str ( ) ,
minor . c_str ( ) ,
rev . c_str ( ) ,
build . c_str ( ) ,
host_port . c_str ( ) ,
use_redis . c_str ( )
} ;
PGresult * res = PQexecParams ( conn ,
" INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_redis) "
" VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9, $10) "
2018-11-30 18:37:27 +00:00
" ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, "
" public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, "
2019-03-07 00:16:49 +00:00
" v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, "
2021-09-21 15:51:26 +00:00
" use_redis = EXCLUDED.use_redis " ,
10 , // number of parameters
NULL , // oid field. ignore
values , // values for substitution
NULL , // lengths in bytes of each value
NULL , // binary?
0 ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK ) {
fprintf ( stderr , " Heartbeat Update Failed: %s \n " , PQresultErrorMessage ( res ) ) ;
}
PQclear ( res ) ;
2018-11-30 18:37:27 +00:00
}
2021-09-21 15:51:26 +00:00
// if (_rc != NULL) {
// if (_rc->clusterMode) {
// _cluster->zadd("controllers", controllerId, ts);
// } else {
// _redis->zadd("controllers", controllerId, ts);
// }
// }
2018-11-30 18:37:27 +00:00
2018-08-31 21:58:15 +00:00
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 1000 ) ) ;
}
2021-09-21 15:51:26 +00:00
PQfinish ( conn ) ;
conn = NULL ;
2020-05-12 18:56:19 +00:00
fprintf ( stderr , " Exited heartbeat thread \n " ) ;
2018-08-31 21:58:15 +00:00
}
void PostgreSQL : : membersDbWatcher ( )
{
2021-09-21 15:51:26 +00:00
PGconn * conn = getPgConn ( NO_OVERRIDE ) ;
if ( PQstatus ( conn ) = = CONNECTION_BAD ) {
fprintf ( stderr , " Connection to database failed: %s \n " , PQerrorMessage ( conn ) ) ;
PQfinish ( conn ) ;
exit ( 1 ) ;
}
2020-05-12 18:56:19 +00:00
if ( _rc ) {
2021-09-21 15:51:26 +00:00
PQfinish ( conn ) ;
conn = NULL ;
2020-05-12 18:56:19 +00:00
_membersWatcher_Redis ( ) ;
2019-03-05 01:01:16 +00:00
} else {
2021-09-21 15:51:26 +00:00
_membersWatcher_Postgres ( conn ) ;
PQfinish ( conn ) ;
conn = NULL ;
2019-03-05 01:01:16 +00:00
}
if ( _run = = 1 ) {
fprintf ( stderr , " ERROR: %s membersDbWatcher should still be running! Exiting Controller. \n " , _myAddressStr . c_str ( ) ) ;
exit ( 9 ) ;
}
2019-03-08 18:50:33 +00:00
fprintf ( stderr , " Exited membersDbWatcher \n " ) ;
2019-03-05 01:01:16 +00:00
}
2021-09-21 15:51:26 +00:00
void PostgreSQL : : _membersWatcher_Postgres ( PGconn * conn ) {
char buf [ 11 ] = { 0 } ;
std : : string cmd = " LISTEN member_ " + std : : string ( _myAddress . toString ( buf ) ) ;
fprintf ( stderr , " Listening to member stream: %s \n " , cmd . c_str ( ) ) ;
PGresult * res = PQexec ( conn , cmd . c_str ( ) ) ;
if ( ! res | | PQresultStatus ( res ) ! = PGRES_COMMAND_OK ) {
fprintf ( stderr , " LISTEN command failed: %s \n " , PQresultErrorMessage ( res ) ) ;
PQclear ( res ) ;
PQfinish ( conn ) ;
exit ( 1 ) ;
}
2018-08-31 21:58:15 +00:00
2021-09-21 15:51:26 +00:00
PQclear ( res ) ; res = NULL ;
2018-08-31 21:58:15 +00:00
2021-09-21 15:20:15 +00:00
while ( _run = = 1 ) {
2021-09-21 15:51:26 +00:00
if ( PQstatus ( conn ) ! = CONNECTION_OK ) {
fprintf ( stderr , " ERROR: Member Watcher lost connection to Postgres. " ) ;
exit ( - 1 ) ;
}
PGnotify * notify = NULL ;
PQconsumeInput ( conn ) ;
while ( ( notify = PQnotifies ( conn ) ) ! = NULL ) {
//fprintf(stderr, "ASYNC NOTIFY of '%s' id:%s received\n", notify->relname, notify->extra);
2021-09-21 15:20:15 +00:00
2021-09-21 15:51:26 +00:00
try {
json tmp ( json : : parse ( notify - > extra ) ) ;
json & ov = tmp [ " old_val " ] ;
json & nv = tmp [ " new_val " ] ;
json oldConfig , newConfig ;
if ( ov . is_object ( ) ) oldConfig = ov ;
if ( nv . is_object ( ) ) newConfig = nv ;
if ( oldConfig . is_object ( ) | | newConfig . is_object ( ) ) {
_memberChanged ( oldConfig , newConfig , ( this - > _ready > = 2 ) ) ;
}
} catch ( . . . ) { } // ignore bad records
free ( notify ) ;
}
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 10 ) ) ;
}
2019-03-05 01:01:16 +00:00
}
2020-05-12 18:56:19 +00:00
void PostgreSQL : : _membersWatcher_Redis ( ) {
char buf [ 11 ] = { 0 } ;
std : : string key = " member-stream:{ " + std : : string ( _myAddress . toString ( buf ) ) + " } " ;
2020-07-28 01:37:45 +00:00
fprintf ( stderr , " Listening to member stream: %s \n " , key . c_str ( ) ) ;
2020-05-12 18:56:19 +00:00
while ( _run = = 1 ) {
2020-05-20 18:42:51 +00:00
try {
json tmp ;
std : : unordered_map < std : : string , ItemStream > result ;
if ( _rc - > clusterMode ) {
_cluster - > xread ( key , " $ " , std : : chrono : : seconds ( 1 ) , 0 , std : : inserter ( result , result . end ( ) ) ) ;
} else {
_redis - > xread ( key , " $ " , std : : chrono : : seconds ( 1 ) , 0 , std : : inserter ( result , result . end ( ) ) ) ;
}
if ( ! result . empty ( ) ) {
for ( auto element : result ) {
# ifdef ZT_TRACE
fprintf ( stdout , " Received notification from: %s \n " , element . first . c_str ( ) ) ;
# endif
for ( auto rec : element . second ) {
std : : string id = rec . first ;
auto attrs = rec . second ;
# ifdef ZT_TRACE
fprintf ( stdout , " Record ID: %s \n " , id . c_str ( ) ) ;
fprintf ( stdout , " attrs len: %lu \n " , attrs . size ( ) ) ;
# endif
for ( auto a : attrs ) {
# ifdef ZT_TRACE
fprintf ( stdout , " key: %s \n value: %s \n " , a . first . c_str ( ) , a . second . c_str ( ) ) ;
# endif
try {
tmp = json : : parse ( a . second ) ;
json & ov = tmp [ " old_val " ] ;
json & nv = tmp [ " new_val " ] ;
json oldConfig , newConfig ;
if ( ov . is_object ( ) ) oldConfig = ov ;
if ( nv . is_object ( ) ) newConfig = nv ;
if ( oldConfig . is_object ( ) | | newConfig . is_object ( ) ) {
_memberChanged ( oldConfig , newConfig , ( this - > _ready > = 2 ) ) ;
}
} catch ( . . . ) {
fprintf ( stderr , " json parse error in networkWatcher_Redis \n " ) ;
2020-05-12 18:56:19 +00:00
}
}
2020-05-20 18:42:51 +00:00
if ( _rc - > clusterMode ) {
_cluster - > xdel ( key , id ) ;
} else {
_redis - > xdel ( key , id ) ;
}
2020-05-12 19:37:05 +00:00
}
2020-05-12 18:56:19 +00:00
}
}
2020-05-20 18:42:51 +00:00
} catch ( sw : : redis : : Error & e ) {
fprintf ( stderr , " Error in Redis members watcher: %s \n " , e . what ( ) ) ;
2020-05-12 18:56:19 +00:00
}
}
fprintf ( stderr , " membersWatcher ended \n " ) ;
2020-05-11 18:48:05 +00:00
}
2018-08-31 21:58:15 +00:00
void PostgreSQL : : networksDbWatcher ( )
{
2021-09-21 15:51:26 +00:00
PGconn * conn = getPgConn ( NO_OVERRIDE ) ;
if ( PQstatus ( conn ) = = CONNECTION_BAD ) {
fprintf ( stderr , " Connection to database failed: %s \n " , PQerrorMessage ( conn ) ) ;
PQfinish ( conn ) ;
exit ( 1 ) ;
}
2020-05-11 22:03:56 +00:00
if ( _rc ) {
2021-09-21 15:51:26 +00:00
PQfinish ( conn ) ;
conn = NULL ;
2020-05-11 22:03:56 +00:00
_networksWatcher_Redis ( ) ;
2019-03-05 01:01:16 +00:00
} else {
2021-09-21 15:51:26 +00:00
_networksWatcher_Postgres ( conn ) ;
PQfinish ( conn ) ;
conn = NULL ;
2019-03-05 01:01:16 +00:00
}
2019-08-23 16:23:39 +00:00
2019-03-05 01:01:16 +00:00
if ( _run = = 1 ) {
fprintf ( stderr , " ERROR: %s networksDbWatcher should still be running! Exiting Controller. \n " , _myAddressStr . c_str ( ) ) ;
exit ( 8 ) ;
}
2019-11-13 20:46:16 +00:00
fprintf ( stderr , " Exited networksDbWatcher \n " ) ;
2019-03-05 01:01:16 +00:00
}
2021-09-21 15:51:26 +00:00
void PostgreSQL : : _networksWatcher_Postgres ( PGconn * conn ) {
char buf [ 11 ] = { 0 } ;
std : : string cmd = " LISTEN network_ " + std : : string ( _myAddress . toString ( buf ) ) ;
PGresult * res = PQexec ( conn , cmd . c_str ( ) ) ;
if ( ! res | | PQresultStatus ( res ) ! = PGRES_COMMAND_OK ) {
fprintf ( stderr , " LISTEN command failed: %s \n " , PQresultErrorMessage ( res ) ) ;
PQclear ( res ) ;
PQfinish ( conn ) ;
exit ( 1 ) ;
}
2019-03-07 00:16:49 +00:00
2021-09-21 15:51:26 +00:00
PQclear ( res ) ; res = NULL ;
2019-03-07 00:16:49 +00:00
2018-11-30 18:37:27 +00:00
while ( _run = = 1 ) {
2021-09-21 15:51:26 +00:00
if ( PQstatus ( conn ) ! = CONNECTION_OK ) {
fprintf ( stderr , " ERROR: Network Watcher lost connection to Postgres. " ) ;
exit ( - 1 ) ;
}
PGnotify * notify = NULL ;
PQconsumeInput ( conn ) ;
while ( ( notify = PQnotifies ( conn ) ) ! = NULL ) {
//fprintf(stderr, "ASYNC NOTIFY of '%s' id:%s received\n", notify->relname, notify->extra);
try {
json tmp ( json : : parse ( notify - > extra ) ) ;
json & ov = tmp [ " old_val " ] ;
json & nv = tmp [ " new_val " ] ;
json oldConfig , newConfig ;
if ( ov . is_object ( ) ) oldConfig = ov ;
if ( nv . is_object ( ) ) newConfig = nv ;
if ( oldConfig . is_object ( ) | | newConfig . is_object ( ) ) {
_networkChanged ( oldConfig , newConfig , ( this - > _ready > = 2 ) ) ;
}
} catch ( . . . ) { } // ignore bad records
free ( notify ) ;
}
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 10 ) ) ;
2018-11-30 18:37:27 +00:00
}
2019-03-05 01:01:16 +00:00
}
2020-05-11 18:48:05 +00:00
void PostgreSQL : : _networksWatcher_Redis ( ) {
2020-05-12 18:56:19 +00:00
char buf [ 11 ] = { 0 } ;
std : : string key = " network-stream:{ " + std : : string ( _myAddress . toString ( buf ) ) + " } " ;
while ( _run = = 1 ) {
2020-05-20 18:42:51 +00:00
try {
json tmp ;
std : : unordered_map < std : : string , ItemStream > result ;
if ( _rc - > clusterMode ) {
_cluster - > xread ( key , " $ " , std : : chrono : : seconds ( 1 ) , 0 , std : : inserter ( result , result . end ( ) ) ) ;
} else {
_redis - > xread ( key , " $ " , std : : chrono : : seconds ( 1 ) , 0 , std : : inserter ( result , result . end ( ) ) ) ;
}
if ( ! result . empty ( ) ) {
for ( auto element : result ) {
2020-05-12 19:48:58 +00:00
# ifdef ZT_TRACE
2020-05-20 18:42:51 +00:00
fprintf ( stdout , " Received notification from: %s \n " , element . first . c_str ( ) ) ;
2020-05-12 19:48:58 +00:00
# endif
2020-05-20 18:42:51 +00:00
for ( auto rec : element . second ) {
std : : string id = rec . first ;
auto attrs = rec . second ;
2020-05-12 19:48:58 +00:00
# ifdef ZT_TRACE
2020-05-20 18:42:51 +00:00
fprintf ( stdout , " Record ID: %s \n " , id . c_str ( ) ) ;
fprintf ( stdout , " attrs len: %lu \n " , attrs . size ( ) ) ;
2020-05-12 19:48:58 +00:00
# endif
2020-05-20 18:42:51 +00:00
for ( auto a : attrs ) {
2020-05-12 19:48:58 +00:00
# ifdef ZT_TRACE
2020-05-20 18:42:51 +00:00
fprintf ( stdout , " key: %s \n value: %s \n " , a . first . c_str ( ) , a . second . c_str ( ) ) ;
2020-05-12 19:48:58 +00:00
# endif
2020-05-20 18:42:51 +00:00
try {
tmp = json : : parse ( a . second ) ;
json & ov = tmp [ " old_val " ] ;
json & nv = tmp [ " new_val " ] ;
json oldConfig , newConfig ;
if ( ov . is_object ( ) ) oldConfig = ov ;
if ( nv . is_object ( ) ) newConfig = nv ;
if ( oldConfig . is_object ( ) | | newConfig . is_object ( ) ) {
_networkChanged ( oldConfig , newConfig , ( this - > _ready > = 2 ) ) ;
}
} catch ( . . . ) {
fprintf ( stderr , " json parse error in networkWatcher_Redis \n " ) ;
2020-05-12 18:56:19 +00:00
}
}
2020-05-20 18:42:51 +00:00
if ( _rc - > clusterMode ) {
_cluster - > xdel ( key , id ) ;
} else {
_redis - > xdel ( key , id ) ;
}
2020-05-12 19:37:05 +00:00
}
2020-05-12 18:56:19 +00:00
}
}
2020-05-20 18:42:51 +00:00
} catch ( sw : : redis : : Error & e ) {
fprintf ( stderr , " Error in Redis networks watcher: %s \n " , e . what ( ) ) ;
2020-05-12 18:56:19 +00:00
}
}
fprintf ( stderr , " networksWatcher ended \n " ) ;
2020-05-11 18:48:05 +00:00
}
2018-08-31 21:58:15 +00:00
void PostgreSQL : : commitThread ( )
{
2021-09-21 15:51:26 +00:00
PGconn * conn = getPgConn ( ) ;
if ( PQstatus ( conn ) = = CONNECTION_BAD ) {
fprintf ( stderr , " ERROR: Connection to database failed: %s \n " , PQerrorMessage ( conn ) ) ;
PQfinish ( conn ) ;
exit ( 1 ) ;
}
2019-08-06 15:42:54 +00:00
std : : pair < nlohmann : : json , bool > qitem ;
while ( _commitQueue . get ( qitem ) & ( _run = = 1 ) ) {
if ( ! qitem . first . is_object ( ) ) {
2018-08-31 21:58:15 +00:00
continue ;
}
2021-09-21 15:51:26 +00:00
if ( PQstatus ( conn ) = = CONNECTION_BAD ) {
fprintf ( stderr , " ERROR: Connection to database failed: %s \n " , PQerrorMessage ( conn ) ) ;
PQfinish ( conn ) ;
exit ( 1 ) ;
}
2019-08-06 15:42:54 +00:00
try {
nlohmann : : json * config = & ( qitem . first ) ;
2018-09-05 18:49:07 +00:00
const std : : string objtype = ( * config ) [ " objtype " ] ;
if ( objtype = = " member " ) {
2018-11-14 00:00:13 +00:00
try {
2020-10-05 20:32:47 +00:00
std : : string memberId = ( * config ) [ " id " ] ;
std : : string networkId = ( * config ) [ " nwid " ] ;
2021-09-21 15:51:26 +00:00
std : : string identity = ( * config ) [ " identity " ] ;
2020-10-05 20:32:47 +00:00
std : : string target = " NULL " ;
2021-09-21 15:51:26 +00:00
2020-10-05 20:32:47 +00:00
if ( ! ( * config ) [ " remoteTraceTarget " ] . is_null ( ) ) {
target = ( * config ) [ " remoteTraceTarget " ] ;
}
2021-09-21 15:20:15 +00:00
2021-09-21 15:51:26 +00:00
std : : string caps = OSUtils : : jsonDump ( ( * config ) [ " capabilities " ] , - 1 ) ;
std : : string lastAuthTime = std : : to_string ( ( long long ) ( * config ) [ " lastAuthorizedTime " ] ) ;
std : : string lastDeauthTime = std : : to_string ( ( long long ) ( * config ) [ " lastDeauthorizedTime " ] ) ;
std : : string rtraceLevel = std : : to_string ( ( int ) ( * config ) [ " remoteTraceLevel " ] ) ;
std : : string rev = std : : to_string ( ( unsigned long long ) ( * config ) [ " revision " ] ) ;
std : : string tags = OSUtils : : jsonDump ( ( * config ) [ " tags " ] , - 1 ) ;
std : : string vmajor = std : : to_string ( ( int ) ( * config ) [ " vMajor " ] ) ;
std : : string vminor = std : : to_string ( ( int ) ( * config ) [ " vMinor " ] ) ;
std : : string vrev = std : : to_string ( ( int ) ( * config ) [ " vRev " ] ) ;
std : : string vproto = std : : to_string ( ( int ) ( * config ) [ " vProto " ] ) ;
const char * values [ 19 ] = {
memberId . c_str ( ) ,
networkId . c_str ( ) ,
( ( * config ) [ " activeBridge " ] ? " true " : " false " ) ,
( ( * config ) [ " authorized " ] ? " true " : " false " ) ,
caps . c_str ( ) ,
identity . c_str ( ) ,
lastAuthTime . c_str ( ) ,
lastDeauthTime . c_str ( ) ,
( ( * config ) [ " noAutoAssignIps " ] ? " true " : " false " ) ,
rtraceLevel . c_str ( ) ,
( target = = " NULL " ) ? NULL : target . c_str ( ) ,
rev . c_str ( ) ,
tags . c_str ( ) ,
vmajor . c_str ( ) ,
vminor . c_str ( ) ,
vrev . c_str ( ) ,
vproto . c_str ( )
} ;
PGresult * res = PQexec ( conn , " BEGIN " ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK ) {
fprintf ( stderr , " ERROR: Error beginning update transaction: %s \n " , PQresultErrorMessage ( res ) ) ;
PQclear ( res ) ;
delete config ;
config = nullptr ;
continue ;
}
res = PQexecParams ( conn ,
2020-10-05 20:32:47 +00:00
" INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, "
" identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, "
" remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) "
" VALUES ($1, $2, $3, $4, $5, $6, "
" TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), "
" $9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (network_id, id) DO UPDATE SET "
" active_bridge = EXCLUDED.active_bridge, authorized = EXCLUDED.authorized, capabilities = EXCLUDED.capabilities, "
" identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, "
" last_deauthorized_time = EXCLUDED.last_deauthorized_time, no_auto_assign_ips = EXCLUDED.no_auto_assign_ips, "
" remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, "
" revision = EXCLUDED.revision+1, tags = EXCLUDED.tags, v_major = EXCLUDED.v_major, "
" v_minor = EXCLUDED.v_minor, v_rev = EXCLUDED.v_rev, v_proto = EXCLUDED.v_proto " ,
2021-09-21 15:51:26 +00:00
17 ,
NULL ,
values ,
NULL ,
NULL ,
0 ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK ) {
fprintf ( stderr , " ERROR: Error updating member: %s \n " , PQresultErrorMessage ( res ) ) ;
fprintf ( stderr , " %s " , OSUtils : : jsonDump ( * config , 2 ) . c_str ( ) ) ;
PQclear ( res ) ;
PQclear ( PQexec ( conn , " ROLLBACK " ) ) ;
delete config ;
config = nullptr ;
continue ;
}
PQclear ( res ) ;
const char * v2 [ 2 ] = {
memberId . c_str ( ) ,
networkId . c_str ( )
} ;
res = PQexecParams ( conn ,
" DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2 " ,
2 ,
NULL ,
v2 ,
NULL ,
NULL ,
0 ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK ) {
fprintf ( stderr , " ERROR: Error updating IP address assignments: %s \n " , PQresultErrorMessage ( res ) ) ;
PQclear ( res ) ;
PQclear ( PQexec ( conn , " ROLLBACK " ) ) ; ;
delete config ;
config = nullptr ;
continue ;
}
PQclear ( res ) ;
2020-10-05 18:02:40 +00:00
2020-10-05 20:32:47 +00:00
std : : vector < std : : string > assignments ;
bool ipAssignError = false ;
for ( auto i = ( * config ) [ " ipAssignments " ] . begin ( ) ; i ! = ( * config ) [ " ipAssignments " ] . end ( ) ; + + i ) {
std : : string addr = * i ;
2020-10-05 18:02:40 +00:00
2020-10-05 20:32:47 +00:00
if ( std : : find ( assignments . begin ( ) , assignments . end ( ) , addr ) ! = assignments . end ( ) ) {
2020-10-05 18:02:40 +00:00
continue ;
}
2021-09-21 15:51:26 +00:00
const char * v3 [ 3 ] = {
memberId . c_str ( ) ,
networkId . c_str ( ) ,
addr . c_str ( )
} ;
2021-09-21 15:20:15 +00:00
2021-09-21 15:51:26 +00:00
res = PQexecParams ( conn ,
" INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3) ON CONFLICT (network_id, member_id, address) DO NOTHING " ,
3 ,
NULL ,
v3 ,
NULL ,
NULL ,
0 ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK ) {
fprintf ( stderr , " ERROR: Error setting IP addresses for member: %s \n " , PQresultErrorMessage ( res ) ) ;
PQclear ( res ) ;
PQclear ( PQexec ( conn , " ROLLBACK " ) ) ;
ipAssignError = true ;
break ;
}
PQclear ( res ) ;
2020-10-05 20:32:47 +00:00
assignments . push_back ( addr ) ;
}
if ( ipAssignError ) {
delete config ;
config = nullptr ;
continue ;
}
2021-09-21 15:51:26 +00:00
res = PQexec ( conn , " COMMIT " ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK ) {
fprintf ( stderr , " ERROR: Error committing member transaction: %s \n " , PQresultErrorMessage ( res ) ) ;
PQclear ( res ) ;
PQclear ( PQexec ( conn , " ROLLBACK " ) ) ;
delete config ;
config = nullptr ;
continue ;
}
2018-10-24 19:06:17 +00:00
2018-11-30 18:37:27 +00:00
const uint64_t nwidInt = OSUtils : : jsonIntHex ( ( * config ) [ " nwid " ] , 0ULL ) ;
const uint64_t memberidInt = OSUtils : : jsonIntHex ( ( * config ) [ " id " ] , 0ULL ) ;
if ( nwidInt & & memberidInt ) {
nlohmann : : json nwOrig ;
nlohmann : : json memOrig ;
nlohmann : : json memNew ( * config ) ;
2019-08-06 15:42:54 +00:00
2018-11-30 18:37:27 +00:00
get ( nwidInt , nwOrig , memberidInt , memOrig ) ;
2019-08-23 16:23:39 +00:00
2019-08-06 15:42:54 +00:00
_memberChanged ( memOrig , memNew , qitem . second ) ;
2018-11-30 18:37:27 +00:00
} else {
2021-09-21 15:51:26 +00:00
fprintf ( stderr , " Can't notify of change. Error parsing nwid or memberid: %llu-%llu \n " , ( unsigned long long ) nwidInt , ( unsigned long long ) memberidInt ) ;
2018-11-30 18:37:27 +00:00
}
} catch ( std : : exception & e ) {
2021-09-21 15:51:26 +00:00
fprintf ( stderr , " ERROR: Error updating member: %s \n " , e . what ( ) ) ;
2018-09-04 23:05:34 +00:00
}
2018-09-05 18:49:07 +00:00
} else if ( objtype = = " network " ) {
try {
2020-10-05 20:32:47 +00:00
std : : string id = ( * config ) [ " id " ] ;
2021-09-21 15:51:26 +00:00
std : : string controllerId = _myAddressStr . c_str ( ) ;
std : : string name = ( * config ) [ " name " ] ;
std : : string remoteTraceTarget ( " NULL " ) ;
if ( ! ( * config ) [ " remoteTraceTarget " ] . is_null ( ) ) {
2020-10-05 20:32:47 +00:00
remoteTraceTarget = ( * config ) [ " remoteTraceTarget " ] ;
}
2021-09-21 15:51:26 +00:00
std : : string rulesSource ;
2020-10-05 20:32:47 +00:00
if ( ( * config ) [ " rulesSource " ] . is_string ( ) ) {
rulesSource = ( * config ) [ " rulesSource " ] ;
}
2021-09-21 15:51:26 +00:00
std : : string caps = OSUtils : : jsonDump ( ( * config ) [ " capabilitles " ] , - 1 ) ;
std : : string now = std : : to_string ( OSUtils : : now ( ) ) ;
std : : string mtu = std : : to_string ( ( int ) ( * config ) [ " mtu " ] ) ;
std : : string mcastLimit = std : : to_string ( ( int ) ( * config ) [ " multicastLimit " ] ) ;
std : : string rtraceLevel = std : : to_string ( ( int ) ( * config ) [ " remoteTraceLevel " ] ) ;
std : : string rules = OSUtils : : jsonDump ( ( * config ) [ " rules " ] , - 1 ) ;
std : : string tags = OSUtils : : jsonDump ( ( * config ) [ " tags " ] , - 1 ) ;
std : : string v4mode = OSUtils : : jsonDump ( ( * config ) [ " v4AssignMode " ] , - 1 ) ;
std : : string v6mode = OSUtils : : jsonDump ( ( * config ) [ " v6AssignMode " ] , - 1 ) ;
bool enableBroadcast = ( * config ) [ " enableBroadcast " ] ;
bool isPrivate = ( * config ) [ " private " ] ;
const char * values [ 16 ] = {
id . c_str ( ) ,
controllerId . c_str ( ) ,
caps . c_str ( ) ,
enableBroadcast ? " true " : " false " ,
now . c_str ( ) ,
mtu . c_str ( ) ,
mcastLimit . c_str ( ) ,
name . c_str ( ) ,
isPrivate ? " true " : " false " ,
rtraceLevel . c_str ( ) ,
( remoteTraceTarget = = " NULL " ? NULL : remoteTraceTarget . c_str ( ) ) ,
rules . c_str ( ) ,
rulesSource . c_str ( ) ,
tags . c_str ( ) ,
v4mode . c_str ( ) ,
v6mode . c_str ( ) ,
} ;
PGresult * res = PQexec ( conn , " BEGIN " ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK ) {
fprintf ( stderr , " ERROR: Error beginnning transaction: %s \n " , PQresultErrorMessage ( res ) ) ;
PQclear ( res ) ;
delete config ;
config = nullptr ;
continue ;
}
PQclear ( res ) ;
2018-11-30 18:37:27 +00:00
2020-10-05 20:32:47 +00:00
// This ugly query exists because when we want to mirror networks to/from
// another data store (e.g. FileDB or LFDB) it is possible to get a network
// that doesn't exist in Central's database. This does an upsert and sets
// the owner_id to the "first" global admin in the user DB if the record
// did not previously exist. If the record already exists owner_id is left
// unchanged, so owner_id should be left out of the update clause.
2021-09-21 15:51:26 +00:00
res = PQexecParams ( conn ,
2020-10-05 20:32:47 +00:00
" INSERT INTO ztc_network (id, creation_time, owner_id, controller_id, capabilities, enable_broadcast, "
" last_modified, mtu, multicast_limit, name, private, "
" remote_trace_level, remote_trace_target, rules, rules_source, "
2021-09-21 15:51:26 +00:00
" tags, v4_assign_mode, v6_assign_mode) VALUES ( "
2020-10-05 20:32:47 +00:00
" $1, TO_TIMESTAMP($5::double precision/1000), "
" (SELECT user_id AS owner_id FROM ztc_global_permissions WHERE authorize = true AND del = true AND modify = true AND read = true LIMIT 1), "
" $2, $3, $4, TO_TIMESTAMP($5::double precision/1000), "
2021-09-21 15:51:26 +00:00
" $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) "
2020-10-05 20:32:47 +00:00
" ON CONFLICT (id) DO UPDATE set controller_id = EXCLUDED.controller_id, "
" capabilities = EXCLUDED.capabilities, enable_broadcast = EXCLUDED.enable_broadcast, "
" last_modified = EXCLUDED.last_modified, mtu = EXCLUDED.mtu, "
" multicast_limit = EXCLUDED.multicast_limit, name = EXCLUDED.name, "
" private = EXCLUDED.private, remote_trace_level = EXCLUDED.remote_trace_level, "
" remote_trace_target = EXCLUDED.remote_trace_target, rules = EXCLUDED.rules, "
" rules_source = EXCLUDED.rules_source, tags = EXCLUDED.tags, "
2021-09-21 15:51:26 +00:00
" v4_assign_mode = EXCLUDED.v4_assign_mode, v6_assign_mode = EXCLUDED.v6_assign_mode " ,
16 ,
NULL ,
values ,
NULL ,
NULL ,
0 ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK ) {
fprintf ( stderr , " ERROR: Error updating network record: %s \n " , PQresultErrorMessage ( res ) ) ;
PQclear ( res ) ;
PQclear ( PQexec ( conn , " ROLLBACK " ) ) ;
delete config ;
config = nullptr ;
continue ;
}
PQclear ( res ) ;
const char * params [ 1 ] = {
id . c_str ( )
} ;
res = PQexecParams ( conn ,
" DELETE FROM ztc_network_assignment_pool WHERE network_id = $1 " ,
1 ,
NULL ,
params ,
NULL ,
NULL ,
0 ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK ) {
fprintf ( stderr , " ERROR: Error updating assignment pool: %s \n " , PQresultErrorMessage ( res ) ) ;
PQclear ( res ) ;
PQclear ( PQexec ( conn , " ROLLBACK " ) ) ;
delete config ;
config = nullptr ;
continue ;
}
PQclear ( res ) ;
2020-10-05 20:32:47 +00:00
auto pool = ( * config ) [ " ipAssignmentPools " ] ;
bool err = false ;
for ( auto i = pool . begin ( ) ; i ! = pool . end ( ) ; + + i ) {
std : : string start = ( * i ) [ " ipRangeStart " ] ;
std : : string end = ( * i ) [ " ipRangeEnd " ] ;
2021-09-21 15:51:26 +00:00
const char * p [ 3 ] = {
id . c_str ( ) ,
start . c_str ( ) ,
end . c_str ( )
} ;
2018-11-30 18:37:27 +00:00
2021-09-21 15:51:26 +00:00
res = PQexecParams ( conn ,
2020-10-05 20:32:47 +00:00
" INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) "
2021-09-21 15:51:26 +00:00
" VALUES ($1, $2, $3) " ,
3 ,
NULL ,
p ,
NULL ,
NULL ,
0 ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK ) {
fprintf ( stderr , " ERROR: Error updating assignment pool: %s \n " , PQresultErrorMessage ( res ) ) ;
PQclear ( res ) ;
err = true ;
break ;
}
PQclear ( res ) ;
}
if ( err ) {
PQclear ( res ) ;
PQclear ( PQexec ( conn , " ROLLBACK " ) ) ;
delete config ;
config = nullptr ;
continue ;
}
res = PQexecParams ( conn ,
" DELETE FROM ztc_network_route WHERE network_id = $1 " ,
1 ,
NULL ,
params ,
NULL ,
NULL ,
0 ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK ) {
fprintf ( stderr , " ERROR: Error updating routes: %s \n " , PQresultErrorMessage ( res ) ) ;
PQclear ( res ) ;
PQclear ( PQexec ( conn , " ROLLBACK " ) ) ;
delete config ;
config = nullptr ;
continue ;
2020-10-05 20:32:47 +00:00
}
2020-10-05 18:02:40 +00:00
2020-10-05 20:32:47 +00:00
auto routes = ( * config ) [ " routes " ] ;
err = false ;
for ( auto i = routes . begin ( ) ; i ! = routes . end ( ) ; + + i ) {
std : : string t = ( * i ) [ " target " ] ;
std : : vector < std : : string > target ;
std : : istringstream f ( t ) ;
std : : string s ;
while ( std : : getline ( f , s , ' / ' ) ) {
target . push_back ( s ) ;
2018-09-05 18:49:07 +00:00
}
2020-10-05 20:32:47 +00:00
if ( target . empty ( ) | | target . size ( ) ! = 2 ) {
2018-09-05 18:49:07 +00:00
continue ;
}
2020-10-05 20:32:47 +00:00
std : : string targetAddr = target [ 0 ] ;
std : : string targetBits = target [ 1 ] ;
std : : string via = " NULL " ;
if ( ! ( * i ) [ " via " ] . is_null ( ) ) {
via = ( * i ) [ " via " ] ;
2018-09-05 18:49:07 +00:00
}
2018-11-30 18:37:27 +00:00
2021-09-21 15:51:26 +00:00
const char * p [ 4 ] = {
id . c_str ( ) ,
targetAddr . c_str ( ) ,
targetBits . c_str ( ) ,
( via = = " NULL " ? NULL : via . c_str ( ) ) ,
} ;
res = PQexecParams ( conn ,
" INSERT INTO ztc_network_route (network_id, address, bits, via) VALUES ($1, $2, $3, $4) " ,
4 ,
NULL ,
p ,
NULL ,
NULL ,
0 ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK ) {
fprintf ( stderr , " ERROR: Error updating routes: %s \n " , PQresultErrorMessage ( res ) ) ;
PQclear ( res ) ;
err = true ;
break ;
}
PQclear ( res ) ;
2020-10-05 20:32:47 +00:00
}
if ( err ) {
2021-09-21 15:51:26 +00:00
PQclear ( PQexec ( conn , " ROLLBACK " ) ) ;
2020-10-05 20:32:47 +00:00
delete config ;
config = nullptr ;
continue ;
}
auto dns = ( * config ) [ " dns " ] ;
std : : string domain = dns [ " domain " ] ;
std : : stringstream servers ;
servers < < " { " ;
for ( auto j = dns [ " servers " ] . begin ( ) ; j < dns [ " servers " ] . end ( ) ; + + j ) {
servers < < * j ;
if ( ( j + 1 ) ! = dns [ " servers " ] . end ( ) ) {
servers < < " , " ;
2020-10-05 18:02:40 +00:00
}
2020-10-05 20:32:47 +00:00
}
servers < < " } " ;
2021-09-21 15:51:26 +00:00
const char * p [ 3 ] = {
id . c_str ( ) ,
domain . c_str ( ) ,
servers . str ( ) . c_str ( )
} ;
res = PQexecParams ( conn , " INSERT INTO ztc_network_dns (network_id, domain, servers) VALUES ($1, $2, $3) ON CONFLICT (network_id) DO UPDATE SET domain = EXCLUDED.domain, servers = EXCLUDED.servers " ,
3 ,
NULL ,
p ,
NULL ,
NULL ,
0 ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK ) {
fprintf ( stderr , " ERROR: Error updating DNS: %s \n " , PQresultErrorMessage ( res ) ) ;
PQclear ( res ) ;
PQclear ( PQexec ( conn , " ROLLBACK " ) ) ;
err = true ;
break ;
}
PQclear ( res ) ;
2021-09-21 15:20:15 +00:00
2021-09-21 15:51:26 +00:00
res = PQexec ( conn , " COMMIT " ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK ) {
fprintf ( stderr , " ERROR: Error committing network update: %s \n " , PQresultErrorMessage ( res ) ) ;
PQclear ( res ) ;
PQclear ( PQexec ( conn , " ROLLBACK " ) ) ;
delete config ;
config = nullptr ;
continue ;
}
PQclear ( res ) ;
2018-10-24 19:06:17 +00:00
2018-11-30 18:37:27 +00:00
const uint64_t nwidInt = OSUtils : : jsonIntHex ( ( * config ) [ " nwid " ] , 0ULL ) ;
if ( nwidInt ) {
nlohmann : : json nwOrig ;
nlohmann : : json nwNew ( * config ) ;
2018-10-24 19:06:17 +00:00
2018-11-30 18:37:27 +00:00
get ( nwidInt , nwOrig ) ;
2018-10-24 19:06:17 +00:00
2019-08-06 15:42:54 +00:00
_networkChanged ( nwOrig , nwNew , qitem . second ) ;
2018-11-30 18:37:27 +00:00
} else {
2021-09-21 15:51:26 +00:00
fprintf ( stderr , " Can't notify network changed: %llu \n " , ( unsigned long long ) nwidInt ) ;
2018-11-30 18:37:27 +00:00
}
2018-10-24 19:06:17 +00:00
2018-11-30 18:37:27 +00:00
} catch ( std : : exception & e ) {
2021-09-21 15:51:26 +00:00
fprintf ( stderr , " ERROR: Error updating member: %s \n " , e . what ( ) ) ;
2018-09-05 18:30:17 +00:00
}
2021-09-21 15:51:26 +00:00
// if (_rc != NULL) {
// try {
// std::string id = (*config)["id"];
// std::string controllerId = _myAddressStr.c_str();
// std::string key = "networks:{" + controllerId + "}";
// if (_rc->clusterMode) {
// _cluster->sadd(key, id);
// } else {
// _redis->sadd(key, id);
// }
// } catch (sw::redis::Error &e) {
// fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what());
// }
// }
2018-09-05 18:49:07 +00:00
} else if ( objtype = = " _delete_network " ) {
try {
std : : string networkId = ( * config ) [ " nwid " ] ;
2021-09-21 15:51:26 +00:00
const char * values [ 1 ] = {
networkId . c_str ( )
} ;
PGresult * res = PQexecParams ( conn ,
" UPDATE ztc_network SET deleted = true WHERE id = $1 " ,
1 ,
NULL ,
values ,
NULL ,
NULL ,
0 ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK ) {
fprintf ( stderr , " ERROR: Error deleting network: %s \n " , PQresultErrorMessage ( res ) ) ;
}
2018-11-30 18:37:27 +00:00
2021-09-21 15:51:26 +00:00
PQclear ( res ) ;
2018-09-05 18:49:07 +00:00
} catch ( std : : exception & e ) {
2021-09-21 15:51:26 +00:00
fprintf ( stderr , " ERROR: Error deleting network: %s \n " , e . what ( ) ) ;
2018-09-05 18:30:17 +00:00
}
2021-09-21 15:51:26 +00:00
// if (_rc != NULL) {
// try {
// std::string id = (*config)["id"];
// std::string controllerId = _myAddressStr.c_str();
// std::string key = "networks:{" + controllerId + "}";
// if (_rc->clusterMode) {
// _cluster->srem(key, id);
// _cluster->del("network-nodes-online:{"+controllerId+"}:"+id);
// } else {
// _redis->srem(key, id);
// _redis->del("network-nodes-online:{"+controllerId+"}:"+id);
// }
// } catch (sw::redis::Error &e) {
// fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what());
// }
// }
2018-09-05 18:49:07 +00:00
} else if ( objtype = = " _delete_member " ) {
try {
std : : string memberId = ( * config ) [ " id " ] ;
std : : string networkId = ( * config ) [ " nwid " ] ;
2021-09-21 15:51:26 +00:00
const char * values [ 2 ] = {
memberId . c_str ( ) ,
networkId . c_str ( )
} ;
PGresult * res = PQexecParams ( conn ,
2018-11-30 18:37:27 +00:00
" UPDATE ztc_member SET hidden = true, deleted = true WHERE id = $1 AND network_id = $2 " ,
2021-09-21 15:51:26 +00:00
2 ,
NULL ,
values ,
NULL ,
NULL ,
0 ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK ) {
fprintf ( stderr , " ERROR: Error deleting member: %s \n " , PQresultErrorMessage ( res ) ) ;
}
2018-11-30 18:37:27 +00:00
2021-09-21 15:51:26 +00:00
PQclear ( res ) ;
2018-09-05 18:49:07 +00:00
} catch ( std : : exception & e ) {
2021-09-21 15:51:26 +00:00
fprintf ( stderr , " ERROR: Error deleting member: %s \n " , e . what ( ) ) ;
2018-09-05 18:30:17 +00:00
}
2021-09-21 15:51:26 +00:00
// if (_rc != NULL) {
// try {
// std::string memberId = (*config)["id"];
// std::string networkId = (*config)["nwid"];
// std::string controllerId = _myAddressStr.c_str();
// std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId;
// if (_rc->clusterMode) {
// _cluster->srem(key, memberId);
// _cluster->del("member:{"+controllerId+"}:"+networkId+":"+memberId);
// } else {
// _redis->srem(key, memberId);
// _redis->del("member:{"+controllerId+"}:"+networkId+":"+memberId);
// }
// } catch (sw::redis::Error &e) {
// fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what());
// }
// }
2018-09-05 18:49:07 +00:00
} else {
2021-09-21 15:51:26 +00:00
fprintf ( stderr , " ERROR: unknown objtype " ) ;
2018-09-04 23:05:34 +00:00
}
2018-09-05 18:49:07 +00:00
} catch ( std : : exception & e ) {
2021-09-21 15:51:26 +00:00
fprintf ( stderr , " ERROR: Error getting objtype: %s \n " , e . what ( ) ) ;
2018-09-04 23:05:34 +00:00
}
2021-09-21 15:51:26 +00:00
2020-10-05 20:32:47 +00:00
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 100 ) ) ;
2018-08-31 21:58:15 +00:00
}
2018-11-30 18:37:27 +00:00
2021-09-21 15:51:26 +00:00
PQfinish ( conn ) ;
if ( _run = = 1 ) {
fprintf ( stderr , " ERROR: %s commitThread should still be running! Exiting Controller. \n " , _myAddressStr . c_str ( ) ) ;
exit ( 7 ) ;
}
fprintf ( stderr , " commitThread finished \n " ) ;
2018-08-31 21:58:15 +00:00
}
void PostgreSQL : : onlineNotificationThread ( )
2020-05-14 00:23:27 +00:00
{
2020-05-20 23:28:28 +00:00
waitForReady ( ) ;
2021-09-21 15:51:26 +00:00
// if (_rc != NULL) {
// onlineNotification_Redis();
// } else {
onlineNotification_Postgres ( ) ;
// }
2020-05-29 02:22:07 +00:00
}
void PostgreSQL : : onlineNotification_Postgres ( )
{
2021-09-21 15:51:26 +00:00
PGconn * conn = getPgConn ( ) ;
if ( PQstatus ( conn ) = = CONNECTION_BAD ) {
fprintf ( stderr , " Connection to database failed: %s \n " , PQerrorMessage ( conn ) ) ;
PQfinish ( conn ) ;
exit ( 1 ) ;
}
2018-08-31 21:58:15 +00:00
_connected = 1 ;
2020-05-27 02:00:55 +00:00
nlohmann : : json jtmp1 , jtmp2 ;
2020-05-29 02:22:07 +00:00
while ( _run = = 1 ) {
2021-09-21 15:51:26 +00:00
if ( PQstatus ( conn ) ! = CONNECTION_OK ) {
fprintf ( stderr , " ERROR: Online Notification thread lost connection to Postgres. " ) ;
PQfinish ( conn ) ;
exit ( 5 ) ;
}
2019-08-23 16:23:39 +00:00
2021-09-21 15:51:26 +00:00
std : : unordered_map < std : : pair < uint64_t , uint64_t > , std : : pair < int64_t , InetAddress > , _PairHasher > lastOnline ;
{
std : : lock_guard < std : : mutex > l ( _lastOnline_l ) ;
lastOnline . swap ( _lastOnline ) ;
}
2019-04-04 19:40:49 +00:00
2021-09-21 15:51:26 +00:00
PGresult * res = NULL ;
2021-09-21 15:20:15 +00:00
2021-09-21 15:51:26 +00:00
std : : stringstream memberUpdate ;
memberUpdate < < " INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES " ;
bool firstRun = true ;
bool memberAdded = false ;
for ( auto i = lastOnline . begin ( ) ; i ! = lastOnline . end ( ) ; + + i ) {
uint64_t nwid_i = i - > first . first ;
char nwidTmp [ 64 ] ;
char memTmp [ 64 ] ;
char ipTmp [ 64 ] ;
OSUtils : : ztsnprintf ( nwidTmp , sizeof ( nwidTmp ) , " %.16llx " , nwid_i ) ;
OSUtils : : ztsnprintf ( memTmp , sizeof ( memTmp ) , " %.10llx " , i - > first . second ) ;
2019-04-04 19:40:49 +00:00
2021-09-21 15:51:26 +00:00
if ( ! get ( nwid_i , jtmp1 , i - > first . second , jtmp2 ) ) {
continue ; // skip non existent networks/members
}
std : : string networkId ( nwidTmp ) ;
std : : string memberId ( memTmp ) ;
const char * qvals [ 2 ] = {
networkId . c_str ( ) ,
memberId . c_str ( )
} ;
res = PQexecParams ( conn ,
" SELECT id, network_id FROM ztc_member WHERE network_id = $1 AND id = $2 " ,
2 ,
NULL ,
qvals ,
NULL ,
NULL ,
0 ) ;
if ( PQresultStatus ( res ) ! = PGRES_TUPLES_OK ) {
fprintf ( stderr , " Member count failed: %s " , PQerrorMessage ( conn ) ) ;
PQclear ( res ) ;
continue ;
}
int nrows = PQntuples ( res ) ;
PQclear ( res ) ;
if ( nrows = = 1 ) {
2019-04-04 19:40:49 +00:00
int64_t ts = i - > second . first ;
std : : string ipAddr = i - > second . second . toIpString ( ipTmp ) ;
std : : string timestamp = std : : to_string ( ts ) ;
2019-08-23 16:23:39 +00:00
2019-04-04 19:40:49 +00:00
if ( firstRun ) {
firstRun = false ;
} else {
memberUpdate < < " , " ;
}
2019-04-04 22:11:01 +00:00
memberUpdate < < " (' " < < networkId < < " ', ' " < < memberId < < " ', " ;
if ( ipAddr . empty ( ) ) {
memberUpdate < < " NULL, " ;
} else {
memberUpdate < < " ' " < < ipAddr < < " ', " ;
}
memberUpdate < < " TO_TIMESTAMP( " < < timestamp < < " ::double precision/1000)) " ;
2019-04-04 19:40:49 +00:00
memberAdded = true ;
2021-09-21 15:51:26 +00:00
} else if ( nrows > 1 ) {
fprintf ( stderr , " nrows > 1?!? " ) ;
continue ;
} else {
continue ;
2019-04-04 19:40:49 +00:00
}
2021-09-21 15:51:26 +00:00
}
memberUpdate < < " ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated; " ;
2019-04-04 19:40:49 +00:00
2021-09-21 15:51:26 +00:00
if ( memberAdded ) {
res = PQexec ( conn , memberUpdate . str ( ) . c_str ( ) ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK ) {
fprintf ( stderr , " Multiple insert failed: %s " , PQerrorMessage ( conn ) ) ;
2019-04-04 19:40:49 +00:00
}
2021-09-21 15:51:26 +00:00
PQclear ( res ) ;
}
2018-09-06 22:14:16 +00:00
2020-09-10 20:18:25 +00:00
std : : this_thread : : sleep_for ( std : : chrono : : seconds ( 10 ) ) ;
2020-05-29 02:22:07 +00:00
}
fprintf ( stderr , " %s: Fell out of run loop in onlineNotificationThread \n " , _myAddressStr . c_str ( ) ) ;
2021-09-21 15:51:26 +00:00
PQfinish ( conn ) ;
2020-05-29 02:22:07 +00:00
if ( _run = = 1 ) {
fprintf ( stderr , " ERROR: %s onlineNotificationThread should still be running! Exiting Controller. \n " , _myAddressStr . c_str ( ) ) ;
exit ( 6 ) ;
}
2018-08-31 21:58:15 +00:00
}
2018-12-03 23:19:15 +00:00
2020-05-14 00:23:27 +00:00
void PostgreSQL : : onlineNotification_Redis ( )
{
_connected = 1 ;
char buf [ 11 ] = { 0 } ;
std : : string controllerId = std : : string ( _myAddress . toString ( buf ) ) ;
while ( _run = = 1 ) {
std : : unordered_map < std : : pair < uint64_t , uint64_t > , std : : pair < int64_t , InetAddress > , _PairHasher > lastOnline ;
{
std : : lock_guard < std : : mutex > l ( _lastOnline_l ) ;
lastOnline . swap ( _lastOnline ) ;
}
2020-05-20 18:38:04 +00:00
try {
if ( ! lastOnline . empty ( ) ) {
if ( _rc - > clusterMode ) {
2020-05-21 16:49:41 +00:00
auto tx = _cluster - > transaction ( controllerId , true ) ;
2020-05-20 18:38:04 +00:00
_doRedisUpdate ( tx , controllerId , lastOnline ) ;
} else {
auto tx = _redis - > transaction ( true ) ;
_doRedisUpdate ( tx , controllerId , lastOnline ) ;
}
2020-05-18 20:58:29 +00:00
}
2020-05-20 18:38:04 +00:00
} catch ( sw : : redis : : Error & e ) {
2020-05-21 16:49:55 +00:00
# ifdef ZT_TRACE
2020-05-20 18:38:04 +00:00
fprintf ( stderr , " Error in online notification thread (redis): %s \n " , e . what ( ) ) ;
2020-05-21 16:49:55 +00:00
# endif
2020-05-20 18:38:04 +00:00
}
2020-08-21 23:22:28 +00:00
std : : this_thread : : sleep_for ( std : : chrono : : seconds ( 10 ) ) ;
2020-05-14 00:23:27 +00:00
}
}
void PostgreSQL : : _doRedisUpdate ( sw : : redis : : Transaction & tx , std : : string & controllerId ,
std : : unordered_map < std : : pair < uint64_t , uint64_t > , std : : pair < int64_t , InetAddress > , _PairHasher > & lastOnline )
{
nlohmann : : json jtmp1 , jtmp2 ;
for ( auto i = lastOnline . begin ( ) ; i ! = lastOnline . end ( ) ; + + i ) {
uint64_t nwid_i = i - > first . first ;
uint64_t memberid_i = i - > first . second ;
char nwidTmp [ 64 ] ;
char memTmp [ 64 ] ;
char ipTmp [ 64 ] ;
OSUtils : : ztsnprintf ( nwidTmp , sizeof ( nwidTmp ) , " %.16llx " , nwid_i ) ;
OSUtils : : ztsnprintf ( memTmp , sizeof ( memTmp ) , " %.10llx " , memberid_i ) ;
if ( ! get ( nwid_i , jtmp1 , memberid_i , jtmp2 ) ) {
continue ; // skip non existent members/networks
}
std : : string networkId ( nwidTmp ) ;
std : : string memberId ( memTmp ) ;
int64_t ts = i - > second . first ;
std : : string ipAddr = i - > second . second . toIpString ( ipTmp ) ;
std : : string timestamp = std : : to_string ( ts ) ;
std : : unordered_map < std : : string , std : : string > record = {
{ " id " , memberId } ,
{ " address " , ipAddr } ,
{ " last_updated " , std : : to_string ( ts ) }
} ;
tx . zadd ( " nodes-online:{ " + controllerId + " } " , memberId , ts )
2020-05-28 03:43:20 +00:00
. zadd ( " nodes-online2:{ " + controllerId + " } " , networkId + " - " + memberId , ts )
2020-05-14 00:23:27 +00:00
. zadd ( " network-nodes-online:{ " + controllerId + " }: " + networkId , memberId , ts )
2020-05-22 17:07:39 +00:00
. zadd ( " active-networks:{ " + controllerId + " } " , networkId , ts )
2020-05-14 00:23:27 +00:00
. sadd ( " network-nodes-all:{ " + controllerId + " }: " + networkId , memberId )
2020-05-20 23:28:28 +00:00
. hmset ( " member:{ " + controllerId + " }: " + networkId + " : " + memberId , record . begin ( ) , record . end ( ) ) ;
2020-05-14 00:23:27 +00:00
}
// expire records from all-nodes and network-nodes member list
uint64_t expireOld = OSUtils : : now ( ) - 300000 ;
tx . zremrangebyscore ( " nodes-online:{ " + controllerId + " } " , sw : : redis : : RightBoundedInterval < double > ( expireOld , sw : : redis : : BoundType : : LEFT_OPEN ) ) ;
2020-05-28 03:43:20 +00:00
tx . zremrangebyscore ( " nodes-online2:{ " + controllerId + " } " , sw : : redis : : RightBoundedInterval < double > ( expireOld , sw : : redis : : BoundType : : LEFT_OPEN ) ) ;
2020-05-22 18:07:12 +00:00
tx . zremrangebyscore ( " active-networks:{ " + controllerId + " } " , sw : : redis : : RightBoundedInterval < double > ( expireOld , sw : : redis : : BoundType : : LEFT_OPEN ) ) ;
2020-05-28 03:41:47 +00:00
{
std : : lock_guard < std : : mutex > l ( _networks_l ) ;
for ( const auto & it : _networks ) {
uint64_t nwid_i = it . first ;
char nwidTmp [ 64 ] ;
OSUtils : : ztsnprintf ( nwidTmp , sizeof ( nwidTmp ) , " %.16llx " , nwid_i ) ;
tx . zremrangebyscore ( " network-nodes-online:{ " + controllerId + " }: " + nwidTmp ,
sw : : redis : : RightBoundedInterval < double > ( expireOld , sw : : redis : : BoundType : : LEFT_OPEN ) ) ;
}
2020-05-14 00:23:27 +00:00
}
tx . exec ( ) ;
}
2021-09-21 15:51:26 +00:00
PGconn * PostgreSQL : : getPgConn ( OverrideMode m )
{
if ( m = = ALLOW_PGBOUNCER_OVERRIDE ) {
char * connStr = getenv ( " PGBOUNCER_CONNSTR " ) ;
if ( connStr ! = NULL ) {
fprintf ( stderr , " PGBouncer Override \n " ) ;
std : : string conn ( connStr ) ;
conn + = " application_name=controller- " ;
conn + = _myAddressStr . c_str ( ) ;
return PQconnectdb ( conn . c_str ( ) ) ;
}
}
return PQconnectdb ( _connString . c_str ( ) ) ;
}
2019-08-06 12:51:50 +00:00
2018-09-28 17:55:39 +00:00
# endif //ZT_CONTROLLER_USE_LIBPQ