2018-08-31 21:58:15 +00:00
/*
2019-08-23 16:23:39 +00:00
* Copyright ( c ) 2019 ZeroTier , Inc .
2018-08-31 21:58:15 +00:00
*
2019-08-23 16:23:39 +00:00
* Use of this software is governed by the Business Source License included
* in the LICENSE . TXT file in the project ' s root directory .
2018-08-31 21:58:15 +00:00
*
2020-08-20 19:51:39 +00:00
* Change Date : 2025 - 01 - 01
2018-08-31 21:58:15 +00:00
*
2019-08-23 16:23:39 +00:00
* On the date above , in accordance with the Business Source License , use
* of this software will be governed by version 2.0 of the Apache License .
2018-08-31 21:58:15 +00:00
*/
2019-08-23 16:23:39 +00:00
/****/
2018-08-31 21:58:15 +00:00
2019-08-06 12:51:50 +00:00
# include "PostgreSQL.hpp"
2018-08-31 21:58:15 +00:00
# ifdef ZT_CONTROLLER_USE_LIBPQ
2019-08-06 12:51:50 +00:00
# include "../node/Constants.hpp"
2021-05-28 21:08:24 +00:00
# include "../node/SHA512.hpp"
2018-08-31 21:58:15 +00:00
# include "EmbeddedNetworkController.hpp"
# include "../version.h"
2020-05-11 22:03:56 +00:00
# include "Redis.hpp"
2018-08-31 21:58:15 +00:00
2018-11-30 18:37:27 +00:00
# include <libpq-fe.h>
2018-09-28 17:55:39 +00:00
# include <sstream>
2021-12-01 01:31:46 +00:00
# include <iomanip>
2020-05-20 23:54:18 +00:00
# include <climits>
2021-08-20 00:55:30 +00:00
# include <chrono>
2020-05-11 19:29:06 +00:00
2018-08-31 21:58:15 +00:00
2022-04-28 20:05:02 +00:00
// #define REDIS_TRACE 1
2021-06-05 20:44:45 +00:00
2018-08-31 21:58:15 +00:00
using json = nlohmann : : json ;
2019-08-06 12:51:50 +00:00
2018-08-31 21:58:15 +00:00
namespace {
2023-01-19 23:39:15 +00:00
static const int DB_MINIMUM_VERSION = 38 ;
2019-07-16 19:15:38 +00:00
2018-08-31 21:58:15 +00:00
static const char * _timestr ( )
{
2021-06-02 20:46:54 +00:00
2018-08-31 21:58:15 +00:00
time_t t = time ( 0 ) ;
char * ts = ctime ( & t ) ;
char * p = ts ;
if ( ! p )
return " " ;
while ( * p ) {
if ( * p = = ' \n ' ) {
* p = ( char ) 0 ;
break ;
}
+ + p ;
}
return ts ;
}
2019-08-06 16:00:35 +00:00
/*
2018-09-06 22:14:16 +00:00
std : : string join ( const std : : vector < std : : string > & elements , const char * const separator )
{
switch ( elements . size ( ) ) {
case 0 :
return " " ;
case 1 :
return elements [ 0 ] ;
default :
std : : ostringstream os ;
std : : copy ( elements . begin ( ) , elements . end ( ) - 1 , std : : ostream_iterator < std : : string > ( os , separator ) ) ;
os < < * elements . rbegin ( ) ;
return os . str ( ) ;
}
}
2019-08-06 16:00:35 +00:00
*/
2018-09-06 22:14:16 +00:00
2021-08-20 17:27:45 +00:00
std : : vector < std : : string > split ( std : : string str , char delim ) {
std : : istringstream iss ( str ) ;
std : : vector < std : : string > tokens ;
std : : string item ;
while ( std : : getline ( iss , item , delim ) ) {
tokens . push_back ( item ) ;
}
return tokens ;
}
2021-12-01 01:27:13 +00:00
std : : string url_encode ( const std : : string & value ) {
std : : ostringstream escaped ;
escaped . fill ( ' 0 ' ) ;
escaped < < std : : hex ;
for ( std : : string : : const_iterator i = value . begin ( ) , n = value . end ( ) ; i ! = n ; + + i ) {
std : : string : : value_type c = ( * i ) ;
// Keep alphanumeric and other accepted characters intact
if ( isalnum ( c ) | | c = = ' - ' | | c = = ' _ ' | | c = = ' . ' | | c = = ' ~ ' ) {
escaped < < c ;
continue ;
}
// Any other characters are percent-encoded
escaped < < std : : uppercase ;
escaped < < ' % ' < < std : : setw ( 2 ) < < int ( ( unsigned char ) c ) ;
escaped < < std : : nouppercase ;
}
return escaped . str ( ) ;
}
2021-08-20 17:27:45 +00:00
2019-08-06 12:51:50 +00:00
} // anonymous namespace
2018-08-31 21:58:15 +00:00
using namespace ZeroTier ;
2021-06-02 18:44:00 +00:00
MemberNotificationReceiver : : MemberNotificationReceiver ( PostgreSQL * p , pqxx : : connection & c , const std : : string & channel )
: pqxx : : notification_receiver ( c , channel )
, _psql ( p )
2021-06-02 20:46:54 +00:00
{
2022-12-07 15:17:53 +00:00
fprintf ( stderr , " initialize MemberNotificationReceiver \n " ) ;
2021-06-02 20:46:54 +00:00
}
2021-06-02 18:44:00 +00:00
void MemberNotificationReceiver : : operator ( ) ( const std : : string & payload , int packend_pid ) {
2021-06-02 20:46:54 +00:00
fprintf ( stderr , " Member Notification received: %s \n " , payload . c_str ( ) ) ;
2023-04-21 19:12:43 +00:00
Metrics : : pgsql_mem_notification + + ;
2021-06-02 18:44:00 +00:00
json tmp ( json : : parse ( payload ) ) ;
json & ov = tmp [ " old_val " ] ;
json & nv = tmp [ " new_val " ] ;
json oldConfig , newConfig ;
if ( ov . is_object ( ) ) oldConfig = ov ;
if ( nv . is_object ( ) ) newConfig = nv ;
if ( oldConfig . is_object ( ) | | newConfig . is_object ( ) ) {
_psql - > _memberChanged ( oldConfig , newConfig , ( _psql - > _ready > = 2 ) ) ;
2021-06-02 20:46:54 +00:00
fprintf ( stderr , " payload sent \n " ) ;
2021-06-02 18:44:00 +00:00
}
}
NetworkNotificationReceiver : : NetworkNotificationReceiver ( PostgreSQL * p , pqxx : : connection & c , const std : : string & channel )
: pqxx : : notification_receiver ( c , channel )
, _psql ( p )
2021-06-02 20:46:54 +00:00
{
fprintf ( stderr , " initialize NetworkNotificationReceiver \n " ) ;
}
2021-06-02 18:44:00 +00:00
void NetworkNotificationReceiver : : operator ( ) ( const std : : string & payload , int packend_pid ) {
2022-12-07 15:17:53 +00:00
fprintf ( stderr , " Network Notification received: %s \n " , payload . c_str ( ) ) ;
2023-04-21 19:12:43 +00:00
Metrics : : pgsql_net_notification + + ;
2021-06-02 20:46:54 +00:00
json tmp ( json : : parse ( payload ) ) ;
2021-06-02 18:44:00 +00:00
json & ov = tmp [ " old_val " ] ;
json & nv = tmp [ " new_val " ] ;
json oldConfig , newConfig ;
if ( ov . is_object ( ) ) oldConfig = ov ;
if ( nv . is_object ( ) ) newConfig = nv ;
if ( oldConfig . is_object ( ) | | newConfig . is_object ( ) ) {
_psql - > _networkChanged ( oldConfig , newConfig , ( _psql - > _ready > = 2 ) ) ;
2021-06-02 20:46:54 +00:00
fprintf ( stderr , " payload sent \n " ) ;
2021-06-02 18:44:00 +00:00
}
}
2020-05-12 18:56:19 +00:00
using Attrs = std : : vector < std : : pair < std : : string , std : : string > > ;
using Item = std : : pair < std : : string , Attrs > ;
using ItemStream = std : : vector < Item > ;
2020-05-11 22:03:56 +00:00
PostgreSQL : : PostgreSQL ( const Identity & myId , const char * path , int listenPort , RedisConfig * rc )
2019-08-06 15:42:54 +00:00
: DB ( )
2021-06-02 18:44:00 +00:00
, _pool ( )
2019-08-06 15:42:54 +00:00
, _myId ( myId )
, _myAddress ( myId . address ( ) )
2019-08-06 12:51:50 +00:00
, _ready ( 0 )
2018-08-31 21:58:15 +00:00
, _connected ( 1 )
2019-08-06 12:51:50 +00:00
, _run ( 1 )
, _waitNoticePrinted ( false )
2019-01-21 19:18:20 +00:00
, _listenPort ( listenPort )
2020-05-11 22:03:56 +00:00
, _rc ( rc )
2020-05-11 23:02:49 +00:00
, _redis ( NULL )
, _cluster ( NULL )
2022-05-10 15:36:39 +00:00
, _redisMemberStatus ( false )
2018-08-31 21:58:15 +00:00
{
2019-08-06 15:42:54 +00:00
char myAddress [ 64 ] ;
_myAddressStr = myId . address ( ) . toString ( myAddress ) ;
2021-09-02 18:23:45 +00:00
_connString = std : : string ( path ) ;
2021-06-02 18:44:00 +00:00
auto f = std : : make_shared < PostgresConnFactory > ( _connString ) ;
_pool = std : : make_shared < ConnectionPool < PostgresConnection > > (
15 , 5 , std : : static_pointer_cast < ConnectionFactory > ( f ) ) ;
2021-05-28 21:08:24 +00:00
memset ( _ssoPsk , 0 , sizeof ( _ssoPsk ) ) ;
char * const ssoPskHex = getenv ( " ZT_SSO_PSK " ) ;
2021-09-15 22:27:19 +00:00
# ifdef ZT_TRACE
2021-06-05 03:04:01 +00:00
fprintf ( stderr , " ZT_SSO_PSK: %s \n " , ssoPskHex ) ;
2021-09-15 22:27:19 +00:00
# endif
2021-05-28 21:08:24 +00:00
if ( ssoPskHex ) {
2022-12-07 15:17:53 +00:00
// SECURITY: note that ssoPskHex will always be null-terminated if libc actually
2021-05-28 21:08:24 +00:00
// returns something non-NULL. If the hex encodes something shorter than 48 bytes,
// it will be padded at the end with zeroes. If longer, it'll be truncated.
Utils : : unhex ( ssoPskHex , _ssoPsk , sizeof ( _ssoPsk ) ) ;
}
2022-05-10 15:36:39 +00:00
const char * redisMemberStatus = getenv ( " ZT_REDIS_MEMBER_STATUS " ) ;
2022-06-13 22:44:35 +00:00
if ( redisMemberStatus & & ( strcmp ( redisMemberStatus , " true " ) = = 0 ) ) {
2022-05-10 15:36:39 +00:00
_redisMemberStatus = true ;
fprintf ( stderr , " Using redis for member status \n " ) ;
}
2021-05-28 21:08:24 +00:00
2021-06-02 18:44:00 +00:00
auto c = _pool - > borrow ( ) ;
pqxx : : work txn { * c - > c } ;
2019-07-16 19:15:38 +00:00
2021-06-02 18:44:00 +00:00
pqxx : : row r { txn . exec1 ( " SELECT version FROM ztc_database " ) } ;
int dbVersion = r [ 0 ] . as < int > ( ) ;
txn . commit ( ) ;
2019-07-16 19:15:38 +00:00
if ( dbVersion < DB_MINIMUM_VERSION ) {
fprintf ( stderr , " Central database schema version too low. This controller version requires a minimum schema version of %d. Please upgrade your Central instance " , DB_MINIMUM_VERSION ) ;
exit ( 1 ) ;
}
2021-06-02 18:44:00 +00:00
_pool - > unborrow ( c ) ;
2019-07-16 19:15:38 +00:00
2020-05-11 23:02:49 +00:00
if ( _rc ! = NULL ) {
sw : : redis : : ConnectionOptions opts ;
sw : : redis : : ConnectionPoolOptions poolOpts ;
opts . host = _rc - > hostname ;
opts . port = _rc - > port ;
opts . password = _rc - > password ;
opts . db = 0 ;
2022-06-22 17:30:58 +00:00
opts . keep_alive = true ;
2022-06-29 16:39:51 +00:00
opts . connect_timeout = std : : chrono : : seconds ( 3 ) ;
2022-06-22 17:07:55 +00:00
poolOpts . size = 25 ;
2022-06-29 16:39:51 +00:00
poolOpts . wait_timeout = std : : chrono : : seconds ( 5 ) ;
poolOpts . connection_lifetime = std : : chrono : : minutes ( 3 ) ;
poolOpts . connection_idle_time = std : : chrono : : minutes ( 1 ) ;
2020-05-11 23:02:49 +00:00
if ( _rc - > clusterMode ) {
2020-05-13 16:46:41 +00:00
fprintf ( stderr , " Using Redis in Cluster Mode \n " ) ;
2020-05-12 18:56:19 +00:00
_cluster = std : : make_shared < sw : : redis : : RedisCluster > ( opts , poolOpts ) ;
2020-05-11 23:02:49 +00:00
} else {
2020-05-13 16:46:41 +00:00
fprintf ( stderr , " Using Redis in Standalone Mode \n " ) ;
2020-05-12 18:56:19 +00:00
_redis = std : : make_shared < sw : : redis : : Redis > ( opts , poolOpts ) ;
2020-05-11 23:02:49 +00:00
}
}
2018-08-31 21:58:15 +00:00
_readyLock . lock ( ) ;
2020-05-20 23:28:28 +00:00
fprintf ( stderr , " [%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download... " ZT_EOL_S , : : _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) ) ;
_waitNoticePrinted = true ;
2021-06-02 18:44:00 +00:00
initializeNetworks ( ) ;
initializeMembers ( ) ;
2020-05-20 23:28:28 +00:00
2018-08-31 21:58:15 +00:00
_heartbeatThread = std : : thread ( & PostgreSQL : : heartbeat , this ) ;
_membersDbWatcher = std : : thread ( & PostgreSQL : : membersDbWatcher , this ) ;
_networksDbWatcher = std : : thread ( & PostgreSQL : : networksDbWatcher , this ) ;
2018-12-06 21:08:31 +00:00
for ( int i = 0 ; i < ZT_CENTRAL_CONTROLLER_COMMIT_THREADS ; + + i ) {
2018-08-31 21:58:15 +00:00
_commitThread [ i ] = std : : thread ( & PostgreSQL : : commitThread , this ) ;
}
_onlineNotificationThread = std : : thread ( & PostgreSQL : : onlineNotificationThread , this ) ;
}
PostgreSQL : : ~ PostgreSQL ( )
{
_run = 0 ;
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 100 ) ) ;
2019-08-23 16:23:39 +00:00
2018-08-31 21:58:15 +00:00
_heartbeatThread . join ( ) ;
_membersDbWatcher . join ( ) ;
_networksDbWatcher . join ( ) ;
2020-05-12 18:56:19 +00:00
_commitQueue . stop ( ) ;
2018-12-06 21:08:31 +00:00
for ( int i = 0 ; i < ZT_CENTRAL_CONTROLLER_COMMIT_THREADS ; + + i ) {
2018-08-31 21:58:15 +00:00
_commitThread [ i ] . join ( ) ;
}
_onlineNotificationThread . join ( ) ;
}
bool PostgreSQL : : waitForReady ( )
{
while ( _ready < 2 ) {
_readyLock . lock ( ) ;
_readyLock . unlock ( ) ;
}
return true ;
}
bool PostgreSQL : : isReady ( )
{
return ( ( _ready = = 2 ) & & ( _connected ) ) ;
}
2019-08-06 15:42:54 +00:00
bool PostgreSQL : : save ( nlohmann : : json & record , bool notifyListeners )
2018-08-31 21:58:15 +00:00
{
2019-08-06 15:42:54 +00:00
bool modified = false ;
2018-10-24 19:06:17 +00:00
try {
2021-06-05 20:44:45 +00:00
if ( ! record . is_object ( ) ) {
fprintf ( stderr , " record is not an object?!? \n " ) ;
2019-08-06 16:00:35 +00:00
return false ;
2021-06-05 20:44:45 +00:00
}
2019-08-06 13:51:23 +00:00
const std : : string objtype = record [ " objtype " ] ;
if ( objtype = = " network " ) {
2021-08-19 16:21:52 +00:00
//fprintf(stderr, "network save\n");
2019-08-06 13:51:23 +00:00
const uint64_t nwid = OSUtils : : jsonIntHex ( record [ " id " ] , 0ULL ) ;
if ( nwid ) {
nlohmann : : json old ;
get ( nwid , old ) ;
2019-08-08 20:29:13 +00:00
if ( ( ! old . is_object ( ) ) | | ( ! _compareRecords ( old , record ) ) ) {
2019-08-06 13:51:23 +00:00
record [ " revision " ] = OSUtils : : jsonInt ( record [ " revision " ] , 0ULL ) + 1ULL ;
2019-08-06 15:42:54 +00:00
_commitQueue . post ( std : : pair < nlohmann : : json , bool > ( record , notifyListeners ) ) ;
modified = true ;
2019-08-06 13:51:23 +00:00
}
}
} else if ( objtype = = " member " ) {
2021-06-05 20:44:07 +00:00
std : : string networkId = record [ " nwid " ] ;
std : : string memberId = record [ " id " ] ;
2019-08-06 13:51:23 +00:00
const uint64_t nwid = OSUtils : : jsonIntHex ( record [ " nwid " ] , 0ULL ) ;
const uint64_t id = OSUtils : : jsonIntHex ( record [ " id " ] , 0ULL ) ;
2021-08-19 16:21:52 +00:00
//fprintf(stderr, "member save %s-%s\n", networkId.c_str(), memberId.c_str());
2019-08-06 13:51:23 +00:00
if ( ( id ) & & ( nwid ) ) {
nlohmann : : json network , old ;
get ( nwid , network , id , old ) ;
2019-08-08 20:29:13 +00:00
if ( ( ! old . is_object ( ) ) | | ( ! _compareRecords ( old , record ) ) ) {
2021-08-19 16:21:52 +00:00
//fprintf(stderr, "commit queue post\n");
2019-08-06 13:51:23 +00:00
record [ " revision " ] = OSUtils : : jsonInt ( record [ " revision " ] , 0ULL ) + 1ULL ;
2019-08-06 15:42:54 +00:00
_commitQueue . post ( std : : pair < nlohmann : : json , bool > ( record , notifyListeners ) ) ;
modified = true ;
2021-06-05 20:44:07 +00:00
} else {
2021-08-19 16:21:52 +00:00
//fprintf(stderr, "no change\n");
2019-08-06 13:51:23 +00:00
}
}
2021-06-05 20:44:07 +00:00
} else {
fprintf ( stderr , " uhh waaat \n " ) ;
2018-10-24 19:06:17 +00:00
}
} catch ( std : : exception & e ) {
fprintf ( stderr , " Error on PostgreSQL::save: %s \n " , e . what ( ) ) ;
} catch ( . . . ) {
fprintf ( stderr , " Unknown error on PostgreSQL::save \n " ) ;
2018-08-31 21:58:15 +00:00
}
2019-08-06 15:42:54 +00:00
return modified ;
2018-08-31 21:58:15 +00:00
}
void PostgreSQL : : eraseNetwork ( const uint64_t networkId )
{
2021-06-02 20:46:54 +00:00
fprintf ( stderr , " PostgreSQL::eraseNetwork \n " ) ;
2018-08-31 21:58:15 +00:00
char tmp2 [ 24 ] ;
waitForReady ( ) ;
Utils : : hex ( networkId , tmp2 ) ;
2019-08-06 15:42:54 +00:00
std : : pair < nlohmann : : json , bool > tmp ;
tmp . first [ " id " ] = tmp2 ;
tmp . first [ " objtype " ] = " _delete_network " ;
tmp . second = true ;
2018-08-31 21:58:15 +00:00
_commitQueue . post ( tmp ) ;
2020-05-14 00:23:27 +00:00
nlohmann : : json nullJson ;
_networkChanged ( tmp . first , nullJson , true ) ;
2018-08-31 21:58:15 +00:00
}
2019-08-23 16:23:39 +00:00
void PostgreSQL : : eraseMember ( const uint64_t networkId , const uint64_t memberId )
2018-08-31 21:58:15 +00:00
{
2021-06-02 20:46:54 +00:00
fprintf ( stderr , " PostgreSQL::eraseMember \n " ) ;
2018-08-31 21:58:15 +00:00
char tmp2 [ 24 ] ;
2020-05-20 23:28:28 +00:00
waitForReady ( ) ;
2020-05-14 00:23:27 +00:00
std : : pair < nlohmann : : json , bool > tmp , nw ;
2018-08-31 21:58:15 +00:00
Utils : : hex ( networkId , tmp2 ) ;
2019-08-06 15:42:54 +00:00
tmp . first [ " nwid " ] = tmp2 ;
2018-08-31 21:58:15 +00:00
Utils : : hex ( memberId , tmp2 ) ;
2019-08-06 15:42:54 +00:00
tmp . first [ " id " ] = tmp2 ;
tmp . first [ " objtype " ] = " _delete_member " ;
tmp . second = true ;
2018-08-31 21:58:15 +00:00
_commitQueue . post ( tmp ) ;
2020-05-14 00:23:27 +00:00
nlohmann : : json nullJson ;
_memberChanged ( tmp . first , nullJson , true ) ;
2018-08-31 21:58:15 +00:00
}
void PostgreSQL : : nodeIsOnline ( const uint64_t networkId , const uint64_t memberId , const InetAddress & physicalAddress )
{
2019-08-06 15:42:54 +00:00
std : : lock_guard < std : : mutex > l ( _lastOnline_l ) ;
std : : pair < int64_t , InetAddress > & i = _lastOnline [ std : : pair < uint64_t , uint64_t > ( networkId , memberId ) ] ;
i . first = OSUtils : : now ( ) ;
if ( physicalAddress ) {
i . second = physicalAddress ;
2018-08-31 21:58:15 +00:00
}
}
2021-11-04 22:40:08 +00:00
AuthInfo PostgreSQL : : getSSOAuthInfo ( const nlohmann : : json & member , const std : : string & redirectURL )
2021-05-28 21:08:24 +00:00
{
2021-06-03 21:38:26 +00:00
// NONCE is just a random character string. no semantic meaning
// state = HMAC SHA384 of Nonce based on shared sso key
//
// need nonce timeout in database? make sure it's used within X time
// X is 5 minutes for now. Make configurable later?
//
// how do we tell when a nonce is used? if auth_expiration_time is set
std : : string networkId = member [ " nwid " ] ;
std : : string memberId = member [ " id " ] ;
2021-11-04 22:40:08 +00:00
char authenticationURL [ 4096 ] = { 0 } ;
AuthInfo info ;
info . enabled = true ;
// fprintf(stderr, "PostgreSQL::updateMemberOnLoad: %s-%s\n", networkId.c_str(), memberId.c_str());
2021-05-28 21:08:24 +00:00
try {
2021-06-02 18:44:00 +00:00
auto c = _pool - > borrow ( ) ;
pqxx : : work w ( * c - > c ) ;
2021-05-28 21:08:24 +00:00
2021-06-05 03:04:01 +00:00
char nonceBytes [ 16 ] = { 0 } ;
2021-06-03 21:38:26 +00:00
std : : string nonce = " " ;
2021-06-04 19:49:26 +00:00
// check if the member exists first.
2021-09-01 19:24:57 +00:00
pqxx : : row count = w . exec_params1 ( " SELECT count(id) FROM ztc_member WHERE id = $1 AND network_id = $2 AND deleted = false " , memberId , networkId ) ;
2021-06-04 19:49:26 +00:00
if ( count [ 0 ] . as < int > ( ) = = 1 ) {
2022-01-20 23:14:29 +00:00
// get active nonce, if exists.
2021-06-04 19:49:26 +00:00
pqxx : : result r = w . exec_params ( " SELECT nonce FROM ztc_sso_expiry "
" WHERE network_id = $1 AND member_id = $2 "
2022-01-20 23:14:29 +00:00
" AND ((NOW() AT TIME ZONE 'UTC') <= authentication_expiry_time) AND ((NOW() AT TIME ZONE 'UTC') <= nonce_expiration) " ,
2021-06-04 19:49:26 +00:00
networkId , memberId ) ;
2022-01-20 23:14:29 +00:00
if ( r . size ( ) = = 0 ) {
// no active nonce.
// find an unused nonce, if one exists.
pqxx : : result r = w . exec_params ( " SELECT nonce FROM ztc_sso_expiry "
" WHERE network_id = $1 AND member_id = $2 "
" AND authentication_expiry_time IS NULL AND ((NOW() AT TIME ZONE 'UTC') <= nonce_expiration) " ,
networkId , memberId ) ;
if ( r . size ( ) = = 1 ) {
// we have an existing nonce. Use it
nonce = r . at ( 0 ) [ 0 ] . as < std : : string > ( ) ;
Utils : : unhex ( nonce . c_str ( ) , nonceBytes , sizeof ( nonceBytes ) ) ;
} else if ( r . empty ( ) ) {
// create a nonce
Utils : : getSecureRandom ( nonceBytes , 16 ) ;
char nonceBuf [ 64 ] = { 0 } ;
Utils : : hex ( nonceBytes , sizeof ( nonceBytes ) , nonceBuf ) ;
nonce = std : : string ( nonceBuf ) ;
pqxx : : result ir = w . exec_params0 ( " INSERT INTO ztc_sso_expiry "
" (nonce, nonce_expiration, network_id, member_id) VALUES "
" ($1, TO_TIMESTAMP($2::double precision/1000), $3, $4) " ,
nonce , OSUtils : : now ( ) + 300000 , networkId , memberId ) ;
w . commit ( ) ;
} else {
// > 1 ?!? Thats an error!
fprintf ( stderr , " > 1 unused nonce! \n " ) ;
exit ( 6 ) ;
}
} else if ( r . size ( ) = = 1 ) {
2021-06-04 19:49:26 +00:00
nonce = r . at ( 0 ) [ 0 ] . as < std : : string > ( ) ;
2021-06-05 03:04:01 +00:00
Utils : : unhex ( nonce . c_str ( ) , nonceBytes , sizeof ( nonceBytes ) ) ;
2022-01-20 23:14:29 +00:00
} else {
// more than 1 nonce in use? Uhhh...
fprintf ( stderr , " > 1 nonce in use for network member?!? \n " ) ;
exit ( 7 ) ;
2021-06-04 19:49:26 +00:00
}
2021-06-03 21:38:26 +00:00
2023-01-19 23:39:15 +00:00
r = w . exec_params (
" SELECT oc.client_id, oc.authorization_endpoint, oc.issuer, oc.provider, oc.sso_impl_version "
" FROM ztc_network AS n "
" INNER JOIN ztc_org o "
" ON o.owner_id = n.owner_id "
" LEFT OUTER JOIN ztc_network_oidc_config noc "
" ON noc.network_id = n.id "
" LEFT OUTER JOIN ztc_oidc_config oc "
" ON noc.client_id = oc.client_id AND noc.org_id = o.org_id "
" WHERE n.id = $1 AND n.sso_enabled = true " , networkId ) ;
2021-06-04 19:49:26 +00:00
std : : string client_id = " " ;
std : : string authorization_endpoint = " " ;
2021-12-01 21:01:32 +00:00
std : : string issuer = " " ;
2023-01-19 23:39:15 +00:00
std : : string provider = " " ;
2021-11-04 22:40:08 +00:00
uint64_t sso_version = 0 ;
2021-06-04 19:49:26 +00:00
if ( r . size ( ) = = 1 ) {
client_id = r . at ( 0 ) [ 0 ] . as < std : : string > ( ) ;
authorization_endpoint = r . at ( 0 ) [ 1 ] . as < std : : string > ( ) ;
2021-12-01 21:01:32 +00:00
issuer = r . at ( 0 ) [ 2 ] . as < std : : string > ( ) ;
2023-01-19 23:39:15 +00:00
provider = r . at ( 0 ) [ 3 ] . as < std : : string > ( ) ;
sso_version = r . at ( 0 ) [ 4 ] . as < uint64_t > ( ) ;
2021-06-04 19:49:26 +00:00
} else if ( r . size ( ) > 1 ) {
fprintf ( stderr , " ERROR: More than one auth endpoint for an organization?!?!? NetworkID: %s \n " , networkId . c_str ( ) ) ;
2021-06-04 20:20:03 +00:00
} else {
fprintf ( stderr , " No client or auth endpoint?!? \n " ) ;
2021-06-04 19:49:26 +00:00
}
2021-11-04 22:40:08 +00:00
info . version = sso_version ;
2021-06-04 20:20:03 +00:00
// no catch all else because we don't actually care if no records exist here. just continue as normal.
2021-06-04 19:49:26 +00:00
if ( ( ! client_id . empty ( ) ) & & ( ! authorization_endpoint . empty ( ) ) ) {
2021-11-04 22:40:08 +00:00
2021-06-04 19:49:26 +00:00
uint8_t state [ 48 ] ;
2021-06-05 03:04:01 +00:00
HMACSHA384 ( _ssoPsk , nonceBytes , sizeof ( nonceBytes ) , state ) ;
2021-06-04 19:49:26 +00:00
char state_hex [ 256 ] ;
Utils : : hex ( state , 48 , state_hex ) ;
2021-11-04 22:40:08 +00:00
if ( info . version = = 0 ) {
char url [ 2048 ] = { 0 } ;
OSUtils : : ztsnprintf ( url , sizeof ( authenticationURL ) ,
" %s?response_type=id_token&response_mode=form_post&scope=openid+email+profile&redirect_uri=%s&nonce=%s&state=%s&client_id=%s " ,
authorization_endpoint . c_str ( ) ,
2021-12-01 01:27:13 +00:00
url_encode ( redirectURL ) . c_str ( ) ,
2021-11-04 22:40:08 +00:00
nonce . c_str ( ) ,
state_hex ,
client_id . c_str ( ) ) ;
info . authenticationURL = std : : string ( url ) ;
} else if ( info . version = = 1 ) {
info . ssoClientID = client_id ;
2021-12-01 21:01:32 +00:00
info . issuerURL = issuer ;
2023-01-19 23:39:15 +00:00
info . ssoProvider = provider ;
2021-11-04 22:40:08 +00:00
info . ssoNonce = nonce ;
2021-12-01 23:02:21 +00:00
info . ssoState = std : : string ( state_hex ) + " _ " + networkId ;
2021-11-04 22:40:08 +00:00
info . centralAuthURL = redirectURL ;
2022-06-15 23:37:09 +00:00
# ifdef ZT_DEBUG
2021-12-01 01:27:13 +00:00
fprintf (
stderr ,
2023-01-19 23:39:15 +00:00
" ssoClientID: %s \n issuerURL: %s \n ssoNonce: %s \n ssoState: %s \n centralAuthURL: %s \n provider: %s \n " ,
2021-12-01 01:27:13 +00:00
info . ssoClientID . c_str ( ) ,
info . issuerURL . c_str ( ) ,
info . ssoNonce . c_str ( ) ,
info . ssoState . c_str ( ) ,
2023-01-19 23:39:15 +00:00
info . centralAuthURL . c_str ( ) ,
provider . c_str ( ) ) ;
2022-06-15 23:37:09 +00:00
# endif
2021-11-04 22:40:08 +00:00
}
2021-06-04 20:20:03 +00:00
} else {
fprintf ( stderr , " client_id: %s \n authorization_endpoint: %s \n " , client_id . c_str ( ) , authorization_endpoint . c_str ( ) ) ;
}
2021-06-02 20:46:54 +00:00
}
2021-06-02 18:44:00 +00:00
2021-06-03 21:38:26 +00:00
_pool - > unborrow ( c ) ;
2021-05-28 21:08:24 +00:00
} catch ( std : : exception & e ) {
2023-02-23 00:13:05 +00:00
fprintf ( stderr , " ERROR: Error updating member on load for network %s: %s \n " , networkId . c_str ( ) , e . what ( ) ) ;
2021-05-28 21:08:24 +00:00
}
2021-06-03 21:38:26 +00:00
2021-11-04 22:40:08 +00:00
return info ; //std::string(authenticationURL);
2021-05-28 21:08:24 +00:00
}
2021-06-02 18:44:00 +00:00
void PostgreSQL : : initializeNetworks ( )
2018-08-31 21:58:15 +00:00
{
2018-09-05 18:49:07 +00:00
try {
2020-05-27 01:54:27 +00:00
std : : string setKey = " networks:{ " + _myAddressStr + " } " ;
2020-05-20 23:28:28 +00:00
fprintf ( stderr , " Initializing Networks... \n " ) ;
2021-08-19 19:44:02 +00:00
2022-06-15 23:05:09 +00:00
if ( _redisMemberStatus ) {
2022-06-16 18:52:35 +00:00
fprintf ( stderr , " Init Redis for networks... \n " ) ;
2022-06-15 23:05:09 +00:00
try {
if ( _rc - > clusterMode ) {
_cluster - > del ( setKey ) ;
} else {
_redis - > del ( setKey ) ;
}
2022-06-16 18:52:35 +00:00
} catch ( sw : : redis : : Error & e ) {
// ignore. if this key doesn't exist, there's no reason to delete it
}
2022-06-15 23:05:09 +00:00
}
std : : unordered_set < std : : string > networkSet ;
2021-08-19 19:44:02 +00:00
char qbuf [ 2048 ] = { 0 } ;
2023-01-19 23:39:15 +00:00
sprintf ( qbuf ,
" SELECT n.id, (EXTRACT(EPOCH FROM n.creation_time AT TIME ZONE 'UTC')*1000)::bigint as creation_time, n.capabilities, "
2021-08-20 00:55:30 +00:00
" n.enable_broadcast, (EXTRACT(EPOCH FROM n.last_modified AT TIME ZONE 'UTC')*1000)::bigint AS last_modified, n.mtu, n.multicast_limit, n.name, n.private, n.remote_trace_level, "
2023-01-19 23:39:15 +00:00
" n.remote_trace_target, n.revision, n.rules, n.tags, n.v4_assign_mode, n.v6_assign_mode, n.sso_enabled, (CASE WHEN n.sso_enabled THEN noc.client_id ELSE NULL END) as client_id, "
" (CASE WHEN n.sso_enabled THEN oc.authorization_endpoint ELSE NULL END) as authorization_endpoint, "
" (CASE WHEN n.sso_enabled THEN oc.provider ELSE NULL END) as provider, d.domain, d.servers, "
2021-08-20 17:27:45 +00:00
" ARRAY(SELECT CONCAT(host(ip_range_start),'|', host(ip_range_end)) FROM ztc_network_assignment_pool WHERE network_id = n.id) AS assignment_pool, "
" ARRAY(SELECT CONCAT(host(address),'/',bits::text,'|',COALESCE(host(via), 'NULL'))FROM ztc_network_route WHERE network_id = n.id) AS routes "
2021-08-20 00:55:30 +00:00
" FROM ztc_network n "
" LEFT OUTER JOIN ztc_org o "
2023-01-19 23:39:15 +00:00
" ON o.owner_id = n.owner_id "
" LEFT OUTER JOIN ztc_network_oidc_config noc "
" ON noc.network_id = n.id "
" LEFT OUTER JOIN ztc_oidc_config oc "
" ON noc.client_id = oc.client_id AND oc.org_id = o.org_id "
2021-08-20 00:55:30 +00:00
" LEFT OUTER JOIN ztc_network_dns d "
" ON d.network_id = n.id "
2021-08-19 19:44:02 +00:00
" WHERE deleted = false AND controller_id = '%s' " , _myAddressStr . c_str ( ) ) ;
auto c = _pool - > borrow ( ) ;
auto c2 = _pool - > borrow ( ) ;
pqxx : : work w { * c - > c } ;
2022-06-16 18:52:35 +00:00
fprintf ( stderr , " Load networks from psql... \n " ) ;
2021-08-19 19:44:02 +00:00
auto stream = pqxx : : stream_from : : query ( w , qbuf ) ;
std : : tuple <
std : : string // network ID
, std : : optional < int64_t > // creationTime
, std : : optional < std : : string > // capabilities
, std : : optional < bool > // enableBroadcast
, std : : optional < uint64_t > // lastModified
, std : : optional < int > // mtu
, std : : optional < int > // multicastLimit
, std : : optional < std : : string > // name
, bool // private
, std : : optional < int > // remoteTraceLevel
, std : : optional < std : : string > // remoteTraceTarget
, std : : optional < uint64_t > // revision
, std : : optional < std : : string > // rules
, std : : optional < std : : string > // tags
, std : : optional < std : : string > // v4AssignMode
, std : : optional < std : : string > // v6AssignMode
, std : : optional < bool > // ssoEnabled
2021-08-20 00:55:30 +00:00
, std : : optional < std : : string > // clientId
, std : : optional < std : : string > // authorizationEndpoint
2023-01-19 23:39:15 +00:00
, std : : optional < std : : string > // ssoProvider
2021-08-20 00:55:30 +00:00
, std : : optional < std : : string > // domain
, std : : optional < std : : string > // servers
2021-08-20 17:27:45 +00:00
, std : : string // assignmentPoolString
, std : : string // routeString
2021-08-19 19:44:02 +00:00
> row ;
2021-08-20 00:55:30 +00:00
uint64_t count = 0 ;
auto tmp = std : : chrono : : high_resolution_clock : : now ( ) ;
uint64_t total = 0 ;
2021-08-19 19:44:02 +00:00
while ( stream > > row ) {
2021-08-20 00:55:30 +00:00
auto start = std : : chrono : : high_resolution_clock : : now ( ) ;
2018-09-05 18:49:07 +00:00
json empty ;
json config ;
2018-09-05 23:54:23 +00:00
2021-06-03 21:38:26 +00:00
initNetwork ( config ) ;
2021-08-19 19:44:02 +00:00
std : : string nwid = std : : get < 0 > ( row ) ;
std : : optional < int64_t > creationTime = std : : get < 1 > ( row ) ;
std : : optional < std : : string > capabilities = std : : get < 2 > ( row ) ;
std : : optional < bool > enableBroadcast = std : : get < 3 > ( row ) ;
std : : optional < uint64_t > lastModified = std : : get < 4 > ( row ) ;
std : : optional < int > mtu = std : : get < 5 > ( row ) ;
std : : optional < int > multicastLimit = std : : get < 6 > ( row ) ;
std : : optional < std : : string > name = std : : get < 7 > ( row ) ;
bool isPrivate = std : : get < 8 > ( row ) ;
std : : optional < int > remoteTraceLevel = std : : get < 9 > ( row ) ;
std : : optional < std : : string > remoteTraceTarget = std : : get < 10 > ( row ) ;
std : : optional < uint64_t > revision = std : : get < 11 > ( row ) ;
std : : optional < std : : string > rules = std : : get < 12 > ( row ) ;
std : : optional < std : : string > tags = std : : get < 13 > ( row ) ;
std : : optional < std : : string > v4AssignMode = std : : get < 14 > ( row ) ;
std : : optional < std : : string > v6AssignMode = std : : get < 15 > ( row ) ;
std : : optional < bool > ssoEnabled = std : : get < 16 > ( row ) ;
2021-08-20 00:55:30 +00:00
std : : optional < std : : string > clientId = std : : get < 17 > ( row ) ;
std : : optional < std : : string > authorizationEndpoint = std : : get < 18 > ( row ) ;
2023-01-19 23:39:15 +00:00
std : : optional < std : : string > ssoProvider = std : : get < 19 > ( row ) ;
std : : optional < std : : string > dnsDomain = std : : get < 20 > ( row ) ;
std : : optional < std : : string > dnsServers = std : : get < 21 > ( row ) ;
std : : string assignmentPoolString = std : : get < 22 > ( row ) ;
std : : string routesString = std : : get < 23 > ( row ) ;
2021-06-02 18:44:00 +00:00
2021-08-19 19:44:02 +00:00
config [ " id " ] = nwid ;
config [ " nwid " ] = nwid ;
config [ " creationTime " ] = creationTime . value_or ( 0 ) ;
config [ " capabilities " ] = json : : parse ( capabilities . value_or ( " [] " ) ) ;
config [ " enableBroadcast " ] = enableBroadcast . value_or ( false ) ;
config [ " lastModified " ] = lastModified . value_or ( 0 ) ;
config [ " mtu " ] = mtu . value_or ( 2800 ) ;
config [ " multicastLimit " ] = multicastLimit . value_or ( 64 ) ;
config [ " name " ] = name . value_or ( " " ) ;
config [ " private " ] = isPrivate ;
config [ " remoteTraceLevel " ] = remoteTraceLevel . value_or ( 0 ) ;
config [ " remoteTraceTarget " ] = remoteTraceTarget . value_or ( " " ) ;
config [ " revision " ] = revision . value_or ( 0 ) ;
config [ " rules " ] = json : : parse ( rules . value_or ( " [] " ) ) ;
config [ " tags " ] = json : : parse ( tags . value_or ( " [] " ) ) ;
config [ " v4AssignMode " ] = json : : parse ( v4AssignMode . value_or ( " {} " ) ) ;
config [ " v6AssignMode " ] = json : : parse ( v6AssignMode . value_or ( " {} " ) ) ;
config [ " ssoEnabled " ] = ssoEnabled . value_or ( false ) ;
config [ " objtype " ] = " network " ;
config [ " ipAssignmentPools " ] = json : : array ( ) ;
config [ " routes " ] = json : : array ( ) ;
2021-08-20 00:55:30 +00:00
config [ " clientId " ] = clientId . value_or ( " " ) ;
config [ " authorizationEndpoint " ] = authorizationEndpoint . value_or ( " " ) ;
2023-01-19 23:39:15 +00:00
config [ " provider " ] = ssoProvider . value_or ( " " ) ;
2021-08-20 00:55:30 +00:00
2022-06-15 23:05:09 +00:00
networkSet . insert ( nwid ) ;
2021-08-20 00:55:30 +00:00
if ( dnsDomain . has_value ( ) ) {
std : : string serverList = dnsServers . value ( ) ;
json obj ;
auto servers = json : : array ( ) ;
if ( serverList . rfind ( " { " , 0 ) ! = std : : string : : npos ) {
serverList = serverList . substr ( 1 , serverList . size ( ) - 2 ) ;
std : : stringstream ss ( serverList ) ;
while ( ss . good ( ) ) {
std : : string server ;
std : : getline ( ss , server , ' , ' ) ;
servers . push_back ( server ) ;
}
}
obj [ " domain " ] = dnsDomain . value ( ) ;
obj [ " servers " ] = servers ;
config [ " dns " ] = obj ;
}
2018-11-30 18:37:27 +00:00
2021-08-20 17:27:45 +00:00
config [ " ipAssignmentPools " ] = json : : array ( ) ;
if ( assignmentPoolString ! = " {} " ) {
std : : string tmp = assignmentPoolString . substr ( 1 , assignmentPoolString . size ( ) - 2 ) ;
std : : vector < std : : string > assignmentPools = split ( tmp , ' , ' ) ;
for ( auto it = assignmentPools . begin ( ) ; it ! = assignmentPools . end ( ) ; + + it ) {
std : : vector < std : : string > r = split ( * it , ' | ' ) ;
2021-08-19 19:44:02 +00:00
json ip ;
2021-08-20 17:27:45 +00:00
ip [ " ipRangeStart " ] = r [ 0 ] ;
ip [ " ipRangeEnd " ] = r [ 1 ] ;
2021-08-19 19:44:02 +00:00
config [ " ipAssignmentPools " ] . push_back ( ip ) ;
}
2021-08-20 17:27:45 +00:00
}
2018-09-04 23:05:34 +00:00
2021-08-20 17:27:45 +00:00
config [ " routes " ] = json : : array ( ) ;
if ( routesString ! = " {} " ) {
std : : string tmp = routesString . substr ( 1 , routesString . size ( ) - 2 ) ;
std : : vector < std : : string > routes = split ( tmp , ' , ' ) ;
for ( auto it = routes . begin ( ) ; it ! = routes . end ( ) ; + + it ) {
std : : vector < std : : string > r = split ( * it , ' | ' ) ;
2021-08-19 19:44:02 +00:00
json route ;
2021-08-20 17:27:45 +00:00
route [ " target " ] = r [ 0 ] ;
route [ " via " ] = ( ( route [ " via " ] = = " NULL " ) ? nullptr : r [ 1 ] ) ;
2021-08-19 19:44:02 +00:00
config [ " routes " ] . push_back ( route ) ;
2018-09-05 18:49:07 +00:00
}
2018-09-04 23:05:34 +00:00
}
2023-04-21 19:12:43 +00:00
Metrics : : network_count + + ;
2021-08-20 00:55:30 +00:00
_networkChanged ( empty , config , false ) ;
2020-08-04 16:45:45 +00:00
2021-08-20 00:55:30 +00:00
auto end = std : : chrono : : high_resolution_clock : : now ( ) ;
auto dur = std : : chrono : : duration_cast < std : : chrono : : microseconds > ( end - start ) ; ;
total + = dur . count ( ) ;
+ + count ;
2021-09-15 16:45:10 +00:00
if ( count > 0 & & count % 10000 = = 0 ) {
2021-08-20 00:55:30 +00:00
fprintf ( stderr , " Averaging %llu us per network \n " , ( total / count ) ) ;
2021-06-03 21:38:26 +00:00
}
2018-09-05 18:49:07 +00:00
}
2018-09-04 21:00:02 +00:00
2021-09-15 16:45:10 +00:00
if ( count > 0 ) {
fprintf ( stderr , " Took %llu us per network to load \n " , ( total / count ) ) ;
}
2021-08-20 00:55:30 +00:00
stream . complete ( ) ;
2021-08-19 19:44:02 +00:00
2021-06-02 18:44:00 +00:00
w . commit ( ) ;
2021-08-19 19:44:02 +00:00
_pool - > unborrow ( c2 ) ;
2021-06-02 18:44:00 +00:00
_pool - > unborrow ( c ) ;
2022-06-16 18:52:35 +00:00
fprintf ( stderr , " done. \n " ) ;
2020-05-27 01:54:27 +00:00
2022-06-15 23:05:09 +00:00
if ( ! networkSet . empty ( ) ) {
if ( _redisMemberStatus ) {
2022-06-16 18:52:35 +00:00
fprintf ( stderr , " adding networks to redis... \n " ) ;
2022-06-15 23:05:09 +00:00
if ( _rc - > clusterMode ) {
2022-06-30 16:46:38 +00:00
auto tx = _cluster - > transaction ( _myAddressStr , true , false ) ;
2022-06-15 23:05:09 +00:00
tx . sadd ( setKey , networkSet . begin ( ) , networkSet . end ( ) ) ;
tx . exec ( ) ;
} else {
2022-06-30 16:46:38 +00:00
auto tx = _redis - > transaction ( true , false ) ;
2022-06-15 23:05:09 +00:00
tx . sadd ( setKey , networkSet . begin ( ) , networkSet . end ( ) ) ;
tx . exec ( ) ;
}
2022-06-16 18:52:35 +00:00
fprintf ( stderr , " done. \n " ) ;
2022-06-15 23:05:09 +00:00
}
}
2018-09-05 18:49:07 +00:00
if ( + + this - > _ready = = 2 ) {
if ( _waitNoticePrinted ) {
fprintf ( stderr , " [%s] NOTICE: %.10llx controller PostgreSQL data download complete. " ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) ) ;
}
_readyLock . unlock ( ) ;
2018-08-31 21:58:15 +00:00
}
2022-06-16 18:52:35 +00:00
fprintf ( stderr , " network init done. \n " ) ;
2020-05-20 23:28:28 +00:00
} catch ( sw : : redis : : Error & e ) {
fprintf ( stderr , " ERROR: Error initializing networks in Redis: %s \n " , e . what ( ) ) ;
2022-06-16 18:52:35 +00:00
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 5000 ) ) ;
2020-05-20 23:28:28 +00:00
exit ( - 1 ) ;
2018-09-05 18:49:07 +00:00
} catch ( std : : exception & e ) {
2020-05-20 23:28:28 +00:00
fprintf ( stderr , " ERROR: Error initializing networks: %s \n " , e . what ( ) ) ;
2022-06-16 18:52:35 +00:00
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 5000 ) ) ;
2018-09-05 18:49:07 +00:00
exit ( - 1 ) ;
2018-08-31 21:58:15 +00:00
}
}
2021-06-02 18:44:00 +00:00
void PostgreSQL : : initializeMembers ( )
2018-08-31 21:58:15 +00:00
{
2021-06-04 21:40:14 +00:00
std : : string memberId ;
std : : string networkId ;
2018-09-05 18:49:07 +00:00
try {
2020-05-27 01:54:27 +00:00
std : : unordered_map < std : : string , std : : string > networkMembers ;
2020-05-20 23:28:28 +00:00
fprintf ( stderr , " Initializing Members... \n " ) ;
2021-08-19 19:44:02 +00:00
2022-06-15 23:05:09 +00:00
std : : string setKeyBase = " network-nodes-all:{ " + _myAddressStr + " }: " ;
if ( _redisMemberStatus ) {
2022-06-16 18:52:35 +00:00
fprintf ( stderr , " Initialize Redis for members... \n " ) ;
2022-06-15 23:05:09 +00:00
std : : lock_guard < std : : mutex > l ( _networks_l ) ;
std : : unordered_set < std : : string > deletes ;
for ( auto it : _networks ) {
uint64_t nwid_i = it . first ;
char nwidTmp [ 64 ] = { 0 } ;
OSUtils : : ztsnprintf ( nwidTmp , sizeof ( nwidTmp ) , " %.16llx " , nwid_i ) ;
std : : string nwid ( nwidTmp ) ;
std : : string key = setKeyBase + nwid ;
deletes . insert ( key ) ;
}
if ( ! deletes . empty ( ) ) {
try {
if ( _rc - > clusterMode ) {
2022-06-30 16:46:38 +00:00
auto tx = _cluster - > transaction ( _myAddressStr , true , false ) ;
2022-06-15 23:05:09 +00:00
for ( std : : string k : deletes ) {
tx . del ( k ) ;
}
tx . exec ( ) ;
} else {
2022-06-30 16:46:38 +00:00
auto tx = _redis - > transaction ( true , false ) ;
2022-06-15 23:05:09 +00:00
for ( std : : string k : deletes ) {
tx . del ( k ) ;
}
tx . exec ( ) ;
}
2022-06-16 18:52:35 +00:00
} catch ( sw : : redis : : Error & e ) {
// ignore
}
2022-06-15 23:05:09 +00:00
}
}
2021-08-19 19:44:02 +00:00
char qbuf [ 2048 ] ;
2022-09-28 20:10:34 +00:00
sprintf ( qbuf ,
" SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, "
" (EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000)::bigint, m.identity, "
" (EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000)::bigint, "
" (EXTRACT(EPOCH FROM m.last_deauthorized_time AT TIME ZONE 'UTC')*1000)::bigint, "
" m.remote_trace_level, m.remote_trace_target, m.tags, m.v_major, m.v_minor, m.v_rev, m.v_proto, "
" m.no_auto_assign_ips, m.revision, m.sso_exempt, "
" (CASE WHEN n.sso_enabled = TRUE AND m.sso_exempt = FALSE THEN "
" ( "
" SELECT (EXTRACT(EPOCH FROM e.authentication_expiry_time)*1000)::bigint "
" FROM ztc_sso_expiry e "
" INNER JOIN ztc_network n1 "
" ON n1.id = e.network_id AND n1.deleted = TRUE "
" WHERE e.network_id = m.network_id AND e.member_id = m.id AND n.sso_enabled = TRUE AND e.authentication_expiry_time IS NOT NULL "
" ORDER BY e.authentication_expiry_time DESC LIMIT 1 "
" ) "
" ELSE NULL "
" END) AS authentication_expiry_time, "
" ARRAY(SELECT DISTINCT address FROM ztc_member_ip_assignment WHERE member_id = m.id AND network_id = m.network_id) AS assigned_addresses "
2018-09-05 18:49:07 +00:00
" FROM ztc_member m "
" INNER JOIN ztc_network n "
" ON n.id = m.network_id "
2022-09-28 20:10:34 +00:00
" WHERE n.controller_id = '%s' AND n.deleted = FALSE AND m.deleted = FALSE " , _myAddressStr . c_str ( ) ) ;
2021-08-19 19:44:02 +00:00
auto c = _pool - > borrow ( ) ;
auto c2 = _pool - > borrow ( ) ;
pqxx : : work w { * c - > c } ;
2022-06-16 18:52:35 +00:00
fprintf ( stderr , " Load members from psql... \n " ) ;
2021-08-19 19:44:02 +00:00
auto stream = pqxx : : stream_from : : query ( w , qbuf ) ;
std : : tuple <
std : : string // memberId
, std : : string // memberId
, std : : optional < bool > // activeBridge
, std : : optional < bool > // authorized
, std : : optional < std : : string > // capabilities
, std : : optional < uint64_t > // creationTime
, std : : optional < std : : string > // identity
, std : : optional < uint64_t > // lastAuthorizedTime
, std : : optional < uint64_t > // lastDeauthorizedTime
, std : : optional < int > // remoteTraceLevel
, std : : optional < std : : string > // remoteTraceTarget
, std : : optional < std : : string > // tags
, std : : optional < int > // vMajor
, std : : optional < int > // vMinor
, std : : optional < int > // vRev
, std : : optional < int > // vProto
, std : : optional < bool > // noAutoAssignIps
, std : : optional < uint64_t > // revision
, std : : optional < bool > // ssoExempt
2021-08-20 17:27:45 +00:00
, std : : optional < uint64_t > // authenticationExpiryTime
, std : : string // assignedAddresses
2021-08-19 19:44:02 +00:00
> row ;
2021-08-20 00:55:30 +00:00
uint64_t count = 0 ;
auto tmp = std : : chrono : : high_resolution_clock : : now ( ) ;
uint64_t total = 0 ;
2021-08-19 19:44:02 +00:00
while ( stream > > row ) {
2021-08-20 00:55:30 +00:00
auto start = std : : chrono : : high_resolution_clock : : now ( ) ;
2018-09-05 18:49:07 +00:00
json empty ;
json config ;
2021-06-04 21:40:14 +00:00
2021-06-03 21:38:26 +00:00
initMember ( config ) ;
2021-08-19 19:44:02 +00:00
memberId = std : : get < 0 > ( row ) ;
networkId = std : : get < 1 > ( row ) ;
std : : optional < bool > activeBridge = std : : get < 2 > ( row ) ;
std : : optional < bool > authorized = std : : get < 3 > ( row ) ;
std : : optional < std : : string > capabilities = std : : get < 4 > ( row ) ;
std : : optional < uint64_t > creationTime = std : : get < 5 > ( row ) ;
std : : optional < std : : string > identity = std : : get < 6 > ( row ) ;
std : : optional < uint64_t > lastAuthorizedTime = std : : get < 7 > ( row ) ;
std : : optional < uint64_t > lastDeauthorizedTime = std : : get < 8 > ( row ) ;
std : : optional < int > remoteTraceLevel = std : : get < 9 > ( row ) ;
std : : optional < std : : string > remoteTraceTarget = std : : get < 10 > ( row ) ;
std : : optional < std : : string > tags = std : : get < 11 > ( row ) ;
std : : optional < int > vMajor = std : : get < 12 > ( row ) ;
std : : optional < int > vMinor = std : : get < 13 > ( row ) ;
std : : optional < int > vRev = std : : get < 14 > ( row ) ;
std : : optional < int > vProto = std : : get < 15 > ( row ) ;
std : : optional < bool > noAutoAssignIps = std : : get < 16 > ( row ) ;
std : : optional < uint64_t > revision = std : : get < 17 > ( row ) ;
std : : optional < bool > ssoExempt = std : : get < 18 > ( row ) ;
2021-08-20 17:27:45 +00:00
std : : optional < uint64_t > authenticationExpiryTime = std : : get < 19 > ( row ) ;
std : : string assignedAddresses = std : : get < 20 > ( row ) ;
2020-05-20 23:28:28 +00:00
2022-06-15 23:05:09 +00:00
networkMembers . insert ( std : : pair < std : : string , std : : string > ( setKeyBase + networkId , memberId ) ) ;
2018-09-05 18:49:07 +00:00
config [ " id " ] = memberId ;
2022-02-28 20:26:32 +00:00
config [ " address " ] = memberId ;
2018-09-05 18:49:07 +00:00
config [ " nwid " ] = networkId ;
2021-08-19 19:44:02 +00:00
config [ " activeBridge " ] = activeBridge . value_or ( false ) ;
config [ " authorized " ] = authorized . value_or ( false ) ;
config [ " capabilities " ] = json : : parse ( capabilities . value_or ( " [] " ) ) ;
config [ " creationTime " ] = creationTime . value_or ( 0 ) ;
config [ " identity " ] = identity . value_or ( " " ) ;
config [ " lastAuthorizedTime " ] = lastAuthorizedTime . value_or ( 0 ) ;
config [ " lastDeauthorizedTime " ] = lastDeauthorizedTime . value_or ( 0 ) ;
config [ " remoteTraceLevel " ] = remoteTraceLevel . value_or ( 0 ) ;
config [ " remoteTraceTarget " ] = remoteTraceTarget . value_or ( " " ) ;
config [ " tags " ] = json : : parse ( tags . value_or ( " [] " ) ) ;
config [ " vMajor " ] = vMajor . value_or ( - 1 ) ;
config [ " vMinor " ] = vMinor . value_or ( - 1 ) ;
config [ " vRev " ] = vRev . value_or ( - 1 ) ;
config [ " vProto " ] = vProto . value_or ( - 1 ) ;
config [ " noAutoAssignIps " ] = noAutoAssignIps . value_or ( false ) ;
config [ " revision " ] = revision . value_or ( 0 ) ;
config [ " ssoExempt " ] = ssoExempt . value_or ( false ) ;
2021-08-20 17:27:45 +00:00
config [ " authenticationExpiryTime " ] = authenticationExpiryTime . value_or ( 0 ) ;
2021-08-20 00:55:30 +00:00
config [ " objtype " ] = " member " ;
2021-08-20 17:27:45 +00:00
config [ " ipAssignments " ] = json : : array ( ) ;
2021-08-19 19:44:02 +00:00
2021-08-20 17:27:45 +00:00
if ( assignedAddresses ! = " {} " ) {
std : : string tmp = assignedAddresses . substr ( 1 , assignedAddresses . size ( ) - 2 ) ;
std : : vector < std : : string > addrs = split ( tmp , ' , ' ) ;
for ( auto it = addrs . begin ( ) ; it ! = addrs . end ( ) ; + + it ) {
config [ " ipAssignments " ] . push_back ( * it ) ;
2020-03-04 07:52:53 +00:00
}
2018-09-05 18:49:07 +00:00
}
2018-08-31 21:58:15 +00:00
2023-04-21 19:12:43 +00:00
Metrics : : member_count + + ;
2018-09-05 18:49:07 +00:00
_memberChanged ( empty , config , false ) ;
2021-08-19 19:44:02 +00:00
memberId = " " ;
networkId = " " ;
2021-08-20 00:55:30 +00:00
auto end = std : : chrono : : high_resolution_clock : : now ( ) ;
2022-06-22 16:25:47 +00:00
auto dur = std : : chrono : : duration_cast < std : : chrono : : microseconds > ( end - start ) ;
2021-08-20 00:55:30 +00:00
total + = dur . count ( ) ;
+ + count ;
2021-09-15 16:45:10 +00:00
if ( count > 0 & & count % 10000 = = 0 ) {
2021-08-20 00:55:30 +00:00
fprintf ( stderr , " Averaging %llu us per member \n " , ( total / count ) ) ;
}
2018-08-31 21:58:15 +00:00
}
2021-09-15 16:45:10 +00:00
if ( count > 0 ) {
fprintf ( stderr , " Took %llu us per member to load \n " , ( total / count ) ) ;
}
2021-08-20 00:55:30 +00:00
stream . complete ( ) ;
2018-11-30 18:37:27 +00:00
2021-06-02 18:44:00 +00:00
w . commit ( ) ;
2021-08-19 19:44:02 +00:00
_pool - > unborrow ( c2 ) ;
2021-06-02 18:44:00 +00:00
_pool - > unborrow ( c ) ;
2022-06-16 18:52:35 +00:00
fprintf ( stderr , " done. \n " ) ;
2021-06-02 18:44:00 +00:00
2022-06-15 23:05:09 +00:00
if ( ! networkMembers . empty ( ) ) {
if ( _redisMemberStatus ) {
2022-06-16 18:52:35 +00:00
fprintf ( stderr , " Load member data into redis... \n " ) ;
2022-06-15 23:05:09 +00:00
if ( _rc - > clusterMode ) {
2022-06-30 16:46:38 +00:00
auto tx = _cluster - > transaction ( _myAddressStr , true , false ) ;
2022-06-15 23:05:09 +00:00
for ( auto it : networkMembers ) {
tx . sadd ( it . first , it . second ) ;
}
tx . exec ( ) ;
} else {
2022-06-30 16:46:38 +00:00
auto tx = _redis - > transaction ( true , false ) ;
2022-06-15 23:05:09 +00:00
for ( auto it : networkMembers ) {
tx . sadd ( it . first , it . second ) ;
}
tx . exec ( ) ;
}
2022-06-16 18:52:35 +00:00
fprintf ( stderr , " done. \n " ) ;
2022-06-15 23:05:09 +00:00
}
}
2022-06-16 18:52:35 +00:00
fprintf ( stderr , " Done loading members... \n " ) ;
2018-09-05 18:49:07 +00:00
if ( + + this - > _ready = = 2 ) {
if ( _waitNoticePrinted ) {
fprintf ( stderr , " [%s] NOTICE: %.10llx controller PostgreSQL data download complete. " ZT_EOL_S , _timestr ( ) , ( unsigned long long ) _myAddress . toInt ( ) ) ;
}
_readyLock . unlock ( ) ;
}
2020-05-20 23:28:28 +00:00
} catch ( sw : : redis : : Error & e ) {
fprintf ( stderr , " ERROR: Error initializing members (redis): %s \n " , e . what ( ) ) ;
2022-06-28 16:27:57 +00:00
exit ( - 1 ) ;
2018-09-05 18:49:07 +00:00
} catch ( std : : exception & e ) {
2021-06-04 21:40:14 +00:00
fprintf ( stderr , " ERROR: Error initializing member: %s-%s %s \n " , networkId . c_str ( ) , memberId . c_str ( ) , e . what ( ) ) ;
2018-09-05 18:49:07 +00:00
exit ( - 1 ) ;
2018-08-31 21:58:15 +00:00
}
}
void PostgreSQL : : heartbeat ( )
{
char publicId [ 1024 ] ;
char hostnameTmp [ 1024 ] ;
_myId . toString ( false , publicId ) ;
if ( gethostname ( hostnameTmp , sizeof ( hostnameTmp ) ) ! = 0 ) {
hostnameTmp [ 0 ] = ( char ) 0 ;
} else {
2019-08-08 16:04:11 +00:00
for ( int i = 0 ; i < ( int ) sizeof ( hostnameTmp ) ; + + i ) {
2018-08-31 21:58:15 +00:00
if ( ( hostnameTmp [ i ] = = ' . ' ) | | ( hostnameTmp [ i ] = = 0 ) ) {
hostnameTmp [ i ] = ( char ) 0 ;
break ;
}
}
}
const char * controllerId = _myAddressStr . c_str ( ) ;
const char * publicIdentity = publicId ;
const char * hostname = hostnameTmp ;
while ( _run = = 1 ) {
2021-09-02 23:46:42 +00:00
// fprintf(stderr, "%s: heartbeat\n", controllerId);
2021-06-02 18:44:00 +00:00
auto c = _pool - > borrow ( ) ;
2020-05-22 21:16:04 +00:00
int64_t ts = OSUtils : : now ( ) ;
2021-06-02 18:44:00 +00:00
if ( c - > c ) {
2018-11-30 18:37:27 +00:00
std : : string major = std : : to_string ( ZEROTIER_ONE_VERSION_MAJOR ) ;
std : : string minor = std : : to_string ( ZEROTIER_ONE_VERSION_MINOR ) ;
std : : string rev = std : : to_string ( ZEROTIER_ONE_VERSION_REVISION ) ;
std : : string build = std : : to_string ( ZEROTIER_ONE_VERSION_BUILD ) ;
2020-05-22 21:16:04 +00:00
std : : string now = std : : to_string ( ts ) ;
2019-01-21 19:29:13 +00:00
std : : string host_port = std : : to_string ( _listenPort ) ;
2021-09-02 19:45:19 +00:00
std : : string use_redis = ( _rc ! = NULL ) ? " true " : " false " ;
2022-06-16 17:50:07 +00:00
std : : string redis_mem_status = ( _redisMemberStatus ) ? " true " : " false " ;
2021-06-02 18:44:00 +00:00
try {
2023-03-24 20:03:15 +00:00
pqxx : : work w { * c - > c } ;
pqxx : : result res =
w . exec0 ( " INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_redis, redis_member_status) "
" VALUES ( " + w . quote ( controllerId ) + " , " + w . quote ( hostname ) + " , TO_TIMESTAMP( " + now + " ::double precision/1000), " +
w . quote ( publicIdentity ) + " , " + major + " , " + minor + " , " + rev + " , " + build + " , " + host_port + " , " + use_redis + " , " + redis_mem_status + " ) "
" ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, "
" public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, "
" v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, "
" use_redis = EXCLUDED.use_redis, redis_member_status = EXCLUDED.redis_member_status " ) ;
w . commit ( ) ;
2021-06-02 18:44:00 +00:00
} catch ( std : : exception & e ) {
2023-03-24 20:03:15 +00:00
fprintf ( stderr , " %s: Heartbeat update failed: %s \n " , controllerId , e . what ( ) ) ;
2021-06-02 18:44:00 +00:00
_pool - > unborrow ( c ) ;
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 1000 ) ) ;
continue ;
2023-03-24 20:03:15 +00:00
}
2018-11-30 18:37:27 +00:00
}
2021-06-02 18:44:00 +00:00
_pool - > unborrow ( c ) ;
2018-11-30 18:37:27 +00:00
2022-06-30 18:40:04 +00:00
try {
if ( _redisMemberStatus ) {
if ( _rc - > clusterMode ) {
_cluster - > zadd ( " controllers " , " controllerId " , ts ) ;
} else {
_redis - > zadd ( " controllers " , " controllerId " , ts ) ;
}
2022-06-15 23:05:09 +00:00
}
2022-06-30 18:40:04 +00:00
} catch ( sw : : redis : : Error & e ) {
fprintf ( stderr , " ERROR: Redis error in heartbeat thread: %s \n " , e . what ( ) ) ;
2022-06-15 23:05:09 +00:00
}
2018-08-31 21:58:15 +00:00
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 1000 ) ) ;
}
2020-05-12 18:56:19 +00:00
fprintf ( stderr , " Exited heartbeat thread \n " ) ;
2018-08-31 21:58:15 +00:00
}
void PostgreSQL : : membersDbWatcher ( )
{
2020-05-12 18:56:19 +00:00
if ( _rc ) {
_membersWatcher_Redis ( ) ;
2019-03-05 01:01:16 +00:00
} else {
2021-06-02 18:44:00 +00:00
_membersWatcher_Postgres ( ) ;
2019-03-05 01:01:16 +00:00
}
if ( _run = = 1 ) {
fprintf ( stderr , " ERROR: %s membersDbWatcher should still be running! Exiting Controller. \n " , _myAddressStr . c_str ( ) ) ;
exit ( 9 ) ;
}
2019-03-08 18:50:33 +00:00
fprintf ( stderr , " Exited membersDbWatcher \n " ) ;
2019-03-05 01:01:16 +00:00
}
2021-06-02 18:44:00 +00:00
void PostgreSQL : : _membersWatcher_Postgres ( ) {
auto c = _pool - > borrow ( ) ;
2019-03-07 00:16:49 +00:00
2021-06-02 18:44:00 +00:00
std : : string stream = " member_ " + _myAddressStr ;
2018-08-31 21:58:15 +00:00
2021-06-02 18:44:00 +00:00
fprintf ( stderr , " Listening to member stream: %s \n " , stream . c_str ( ) ) ;
MemberNotificationReceiver m ( this , * c - > c , stream ) ;
2018-08-31 21:58:15 +00:00
2021-06-02 18:44:00 +00:00
while ( _run = = 1 ) {
c - > c - > await_notification ( 5 , 0 ) ;
2018-08-31 21:58:15 +00:00
}
2021-06-02 18:44:00 +00:00
_pool - > unborrow ( c ) ;
2019-03-05 01:01:16 +00:00
}
2020-05-12 18:56:19 +00:00
void PostgreSQL : : _membersWatcher_Redis ( ) {
char buf [ 11 ] = { 0 } ;
std : : string key = " member-stream:{ " + std : : string ( _myAddress . toString ( buf ) ) + " } " ;
2022-04-28 18:13:44 +00:00
std : : string lastID = " 0 " ;
2020-07-28 01:37:45 +00:00
fprintf ( stderr , " Listening to member stream: %s \n " , key . c_str ( ) ) ;
2020-05-12 18:56:19 +00:00
while ( _run = = 1 ) {
2020-05-20 18:42:51 +00:00
try {
json tmp ;
std : : unordered_map < std : : string , ItemStream > result ;
if ( _rc - > clusterMode ) {
2022-04-28 20:10:07 +00:00
_cluster - > xread ( key , lastID , std : : chrono : : seconds ( 1 ) , 0 , std : : inserter ( result , result . end ( ) ) ) ;
2020-05-20 18:42:51 +00:00
} else {
2022-04-28 20:10:07 +00:00
_redis - > xread ( key , lastID , std : : chrono : : seconds ( 1 ) , 0 , std : : inserter ( result , result . end ( ) ) ) ;
2020-05-20 18:42:51 +00:00
}
if ( ! result . empty ( ) ) {
for ( auto element : result ) {
2022-04-28 20:05:02 +00:00
# ifdef REDIS_TRACE
2020-05-20 18:42:51 +00:00
fprintf ( stdout , " Received notification from: %s \n " , element . first . c_str ( ) ) ;
# endif
for ( auto rec : element . second ) {
std : : string id = rec . first ;
auto attrs = rec . second ;
2022-04-28 20:05:02 +00:00
# ifdef REDIS_TRACE
2020-05-20 18:42:51 +00:00
fprintf ( stdout , " Record ID: %s \n " , id . c_str ( ) ) ;
fprintf ( stdout , " attrs len: %lu \n " , attrs . size ( ) ) ;
# endif
for ( auto a : attrs ) {
2022-04-28 20:05:02 +00:00
# ifdef REDIS_TRACE
2020-05-20 18:42:51 +00:00
fprintf ( stdout , " key: %s \n value: %s \n " , a . first . c_str ( ) , a . second . c_str ( ) ) ;
# endif
try {
tmp = json : : parse ( a . second ) ;
json & ov = tmp [ " old_val " ] ;
json & nv = tmp [ " new_val " ] ;
json oldConfig , newConfig ;
if ( ov . is_object ( ) ) oldConfig = ov ;
if ( nv . is_object ( ) ) newConfig = nv ;
if ( oldConfig . is_object ( ) | | newConfig . is_object ( ) ) {
_memberChanged ( oldConfig , newConfig , ( this - > _ready > = 2 ) ) ;
}
} catch ( . . . ) {
fprintf ( stderr , " json parse error in networkWatcher_Redis \n " ) ;
2020-05-12 18:56:19 +00:00
}
}
2020-05-20 18:42:51 +00:00
if ( _rc - > clusterMode ) {
_cluster - > xdel ( key , id ) ;
} else {
_redis - > xdel ( key , id ) ;
}
2022-04-28 18:13:44 +00:00
lastID = id ;
2023-04-25 19:44:18 +00:00
Metrics : : redis_mem_notification + + ;
2020-05-12 19:37:05 +00:00
}
2020-05-12 18:56:19 +00:00
}
}
2020-05-20 18:42:51 +00:00
} catch ( sw : : redis : : Error & e ) {
fprintf ( stderr , " Error in Redis members watcher: %s \n " , e . what ( ) ) ;
2020-05-12 18:56:19 +00:00
}
}
fprintf ( stderr , " membersWatcher ended \n " ) ;
2020-05-11 18:48:05 +00:00
}
2018-08-31 21:58:15 +00:00
void PostgreSQL : : networksDbWatcher ( )
{
2020-05-11 22:03:56 +00:00
if ( _rc ) {
_networksWatcher_Redis ( ) ;
2019-03-05 01:01:16 +00:00
} else {
2021-06-02 18:44:00 +00:00
_networksWatcher_Postgres ( ) ;
2019-03-05 01:01:16 +00:00
}
2019-08-23 16:23:39 +00:00
2019-03-05 01:01:16 +00:00
if ( _run = = 1 ) {
fprintf ( stderr , " ERROR: %s networksDbWatcher should still be running! Exiting Controller. \n " , _myAddressStr . c_str ( ) ) ;
exit ( 8 ) ;
}
2019-11-13 20:46:16 +00:00
fprintf ( stderr , " Exited networksDbWatcher \n " ) ;
2019-03-05 01:01:16 +00:00
}
2021-06-02 18:44:00 +00:00
void PostgreSQL : : _networksWatcher_Postgres ( ) {
std : : string stream = " network_ " + _myAddressStr ;
2019-03-07 00:16:49 +00:00
2021-06-02 20:46:54 +00:00
fprintf ( stderr , " Listening to member stream: %s \n " , stream . c_str ( ) ) ;
2021-06-02 18:44:00 +00:00
auto c = _pool - > borrow ( ) ;
NetworkNotificationReceiver n ( this , * c - > c , stream ) ;
2019-03-07 00:16:49 +00:00
2018-11-30 18:37:27 +00:00
while ( _run = = 1 ) {
2021-06-02 18:44:00 +00:00
c - > c - > await_notification ( 5 , 0 ) ;
2018-11-30 18:37:27 +00:00
}
2019-03-05 01:01:16 +00:00
}
2020-05-11 18:48:05 +00:00
void PostgreSQL : : _networksWatcher_Redis ( ) {
2020-05-12 18:56:19 +00:00
char buf [ 11 ] = { 0 } ;
std : : string key = " network-stream:{ " + std : : string ( _myAddress . toString ( buf ) ) + " } " ;
2022-04-28 18:13:44 +00:00
std : : string lastID = " 0 " ;
2020-05-12 18:56:19 +00:00
while ( _run = = 1 ) {
2020-05-20 18:42:51 +00:00
try {
json tmp ;
std : : unordered_map < std : : string , ItemStream > result ;
if ( _rc - > clusterMode ) {
2022-04-28 18:13:44 +00:00
_cluster - > xread ( key , lastID , std : : chrono : : seconds ( 1 ) , 0 , std : : inserter ( result , result . end ( ) ) ) ;
2020-05-20 18:42:51 +00:00
} else {
2022-04-28 18:13:44 +00:00
_redis - > xread ( key , lastID , std : : chrono : : seconds ( 1 ) , 0 , std : : inserter ( result , result . end ( ) ) ) ;
2020-05-20 18:42:51 +00:00
}
if ( ! result . empty ( ) ) {
for ( auto element : result ) {
2022-04-28 20:05:02 +00:00
# ifdef REDIS_TRACE
2020-05-20 18:42:51 +00:00
fprintf ( stdout , " Received notification from: %s \n " , element . first . c_str ( ) ) ;
2020-05-12 19:48:58 +00:00
# endif
2020-05-20 18:42:51 +00:00
for ( auto rec : element . second ) {
std : : string id = rec . first ;
auto attrs = rec . second ;
2022-04-28 20:05:02 +00:00
# ifdef REDIS_TRACE
2020-05-20 18:42:51 +00:00
fprintf ( stdout , " Record ID: %s \n " , id . c_str ( ) ) ;
fprintf ( stdout , " attrs len: %lu \n " , attrs . size ( ) ) ;
2020-05-12 19:48:58 +00:00
# endif
2020-05-20 18:42:51 +00:00
for ( auto a : attrs ) {
2022-04-28 20:05:02 +00:00
# ifdef REDIS_TRACE
2020-05-20 18:42:51 +00:00
fprintf ( stdout , " key: %s \n value: %s \n " , a . first . c_str ( ) , a . second . c_str ( ) ) ;
2020-05-12 19:48:58 +00:00
# endif
2020-05-20 18:42:51 +00:00
try {
tmp = json : : parse ( a . second ) ;
json & ov = tmp [ " old_val " ] ;
json & nv = tmp [ " new_val " ] ;
json oldConfig , newConfig ;
if ( ov . is_object ( ) ) oldConfig = ov ;
if ( nv . is_object ( ) ) newConfig = nv ;
if ( oldConfig . is_object ( ) | | newConfig . is_object ( ) ) {
_networkChanged ( oldConfig , newConfig , ( this - > _ready > = 2 ) ) ;
}
} catch ( . . . ) {
fprintf ( stderr , " json parse error in networkWatcher_Redis \n " ) ;
2020-05-12 18:56:19 +00:00
}
}
2020-05-20 18:42:51 +00:00
if ( _rc - > clusterMode ) {
_cluster - > xdel ( key , id ) ;
} else {
_redis - > xdel ( key , id ) ;
}
2022-04-28 18:13:44 +00:00
lastID = id ;
2020-05-12 19:37:05 +00:00
}
2023-04-21 19:12:43 +00:00
Metrics : : redis_net_notification + + ;
2020-05-12 18:56:19 +00:00
}
}
2020-05-20 18:42:51 +00:00
} catch ( sw : : redis : : Error & e ) {
fprintf ( stderr , " Error in Redis networks watcher: %s \n " , e . what ( ) ) ;
2020-05-12 18:56:19 +00:00
}
}
fprintf ( stderr , " networksWatcher ended \n " ) ;
2020-05-11 18:48:05 +00:00
}
2018-08-31 21:58:15 +00:00
void PostgreSQL : : commitThread ( )
{
2021-09-02 23:22:52 +00:00
fprintf ( stderr , " %s: commitThread start \n " , _myAddressStr . c_str ( ) ) ;
2019-08-06 15:42:54 +00:00
std : : pair < nlohmann : : json , bool > qitem ;
while ( _commitQueue . get ( qitem ) & ( _run = = 1 ) ) {
2021-08-20 00:55:30 +00:00
//fprintf(stderr, "commitThread tick\n");
2019-08-06 15:42:54 +00:00
if ( ! qitem . first . is_object ( ) ) {
2021-06-02 20:46:54 +00:00
fprintf ( stderr , " not an object \n " ) ;
2018-08-31 21:58:15 +00:00
continue ;
}
2021-10-05 16:25:24 +00:00
std : : shared_ptr < PostgresConnection > c ;
try {
c = _pool - > borrow ( ) ;
} catch ( std : : exception & e ) {
fprintf ( stderr , " ERROR: %s \n " , e . what ( ) ) ;
continue ;
}
if ( ! c ) {
fprintf ( stderr , " Error getting database connection \n " ) ;
continue ;
}
2021-06-02 18:44:00 +00:00
2019-08-06 15:42:54 +00:00
try {
2021-10-06 00:02:50 +00:00
nlohmann : : json & config = ( qitem . first ) ;
const std : : string objtype = config [ " objtype " ] ;
2018-09-05 18:49:07 +00:00
if ( objtype = = " member " ) {
2021-09-02 23:22:52 +00:00
// fprintf(stderr, "%s: commitThread: member\n", _myAddressStr.c_str());
2021-10-06 00:02:50 +00:00
std : : string memberId ;
std : : string networkId ;
2018-11-14 00:00:13 +00:00
try {
2021-06-02 18:44:00 +00:00
pqxx : : work w ( * c - > c ) ;
2021-10-06 00:02:50 +00:00
memberId = config [ " id " ] ;
networkId = config [ " nwid " ] ;
2020-10-05 20:32:47 +00:00
std : : string target = " NULL " ;
2021-10-06 00:02:50 +00:00
if ( ! config [ " remoteTraceTarget " ] . is_null ( ) ) {
target = config [ " remoteTraceTarget " ] ;
2020-10-05 20:32:47 +00:00
}
2021-10-06 00:02:50 +00:00
pqxx : : row nwrow = w . exec_params1 ( " SELECT COUNT(id) FROM ztc_network WHERE id = $1 " , networkId ) ;
int nwcount = nwrow [ 0 ] . as < int > ( ) ;
if ( nwcount ! = 1 ) {
fprintf ( stderr , " network %s does not exist. skipping member upsert \n " , networkId . c_str ( ) ) ;
w . abort ( ) ;
_pool - > unborrow ( c ) ;
continue ;
}
2021-06-02 18:44:00 +00:00
pqxx : : result res = w . exec_params0 (
2020-10-05 20:32:47 +00:00
" INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, "
" identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, "
" remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) "
" VALUES ($1, $2, $3, $4, $5, $6, "
" TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), "
" $9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (network_id, id) DO UPDATE SET "
" active_bridge = EXCLUDED.active_bridge, authorized = EXCLUDED.authorized, capabilities = EXCLUDED.capabilities, "
" identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, "
" last_deauthorized_time = EXCLUDED.last_deauthorized_time, no_auto_assign_ips = EXCLUDED.no_auto_assign_ips, "
" remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, "
" revision = EXCLUDED.revision+1, tags = EXCLUDED.tags, v_major = EXCLUDED.v_major, "
" v_minor = EXCLUDED.v_minor, v_rev = EXCLUDED.v_rev, v_proto = EXCLUDED.v_proto " ,
2021-06-02 18:44:00 +00:00
memberId ,
networkId ,
2021-10-06 00:02:50 +00:00
( bool ) config [ " activeBridge " ] ,
( bool ) config [ " authorized " ] ,
OSUtils : : jsonDump ( config [ " capabilities " ] , - 1 ) ,
OSUtils : : jsonString ( config [ " identity " ] , " " ) ,
( uint64_t ) config [ " lastAuthorizedTime " ] ,
( uint64_t ) config [ " lastDeauthorizedTime " ] ,
( bool ) config [ " noAutoAssignIps " ] ,
( int ) config [ " remoteTraceLevel " ] ,
2021-06-02 18:44:00 +00:00
target ,
2021-10-06 00:02:50 +00:00
( uint64_t ) config [ " revision " ] ,
OSUtils : : jsonDump ( config [ " tags " ] , - 1 ) ,
( int ) config [ " vMajor " ] ,
( int ) config [ " vMinor " ] ,
( int ) config [ " vRev " ] ,
( int ) config [ " vProto " ] ) ;
2021-06-02 18:44:00 +00:00
res = w . exec_params0 ( " DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2 " ,
memberId , networkId ) ;
2020-10-05 18:02:40 +00:00
2020-10-05 20:32:47 +00:00
std : : vector < std : : string > assignments ;
bool ipAssignError = false ;
2021-10-06 00:02:50 +00:00
for ( auto i = config [ " ipAssignments " ] . begin ( ) ; i ! = config [ " ipAssignments " ] . end ( ) ; + + i ) {
2020-10-05 20:32:47 +00:00
std : : string addr = * i ;
2020-10-05 18:02:40 +00:00
2020-10-05 20:32:47 +00:00
if ( std : : find ( assignments . begin ( ) , assignments . end ( ) , addr ) ! = assignments . end ( ) ) {
2020-10-05 18:02:40 +00:00
continue ;
}
2021-06-02 18:44:00 +00:00
res = w . exec_params0 (
2020-10-05 20:32:47 +00:00
" INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3) ON CONFLICT (network_id, member_id, address) DO NOTHING " ,
2021-06-02 18:44:00 +00:00
memberId , networkId , addr ) ;
2020-10-05 20:32:47 +00:00
assignments . push_back ( addr ) ;
}
if ( ipAssignError ) {
2021-09-02 23:22:52 +00:00
fprintf ( stderr , " %s: ipAssignError \n " , _myAddressStr . c_str ( ) ) ;
2021-10-05 16:25:24 +00:00
w . abort ( ) ;
_pool - > unborrow ( c ) ;
c . reset ( ) ;
2020-10-05 20:32:47 +00:00
continue ;
}
2021-06-02 18:44:00 +00:00
w . commit ( ) ;
2018-10-24 19:06:17 +00:00
2021-10-06 00:02:50 +00:00
const uint64_t nwidInt = OSUtils : : jsonIntHex ( config [ " nwid " ] , 0ULL ) ;
const uint64_t memberidInt = OSUtils : : jsonIntHex ( config [ " id " ] , 0ULL ) ;
2018-11-30 18:37:27 +00:00
if ( nwidInt & & memberidInt ) {
nlohmann : : json nwOrig ;
nlohmann : : json memOrig ;
2021-10-06 00:02:50 +00:00
nlohmann : : json memNew ( config ) ;
2019-08-06 15:42:54 +00:00
2018-11-30 18:37:27 +00:00
get ( nwidInt , nwOrig , memberidInt , memOrig ) ;
2019-08-23 16:23:39 +00:00
2019-08-06 15:42:54 +00:00
_memberChanged ( memOrig , memNew , qitem . second ) ;
2018-11-30 18:37:27 +00:00
} else {
2021-09-02 23:22:52 +00:00
fprintf ( stderr , " %s: Can't notify of change. Error parsing nwid or memberid: %llu-%llu \n " , _myAddressStr . c_str ( ) , ( unsigned long long ) nwidInt , ( unsigned long long ) memberidInt ) ;
2018-11-30 18:37:27 +00:00
}
} catch ( std : : exception & e ) {
2021-10-06 00:02:50 +00:00
fprintf ( stderr , " %s ERROR: Error updating member %s-%s: %s \n " , _myAddressStr . c_str ( ) , networkId . c_str ( ) , memberId . c_str ( ) , e . what ( ) ) ;
2018-09-04 23:05:34 +00:00
}
2018-09-05 18:49:07 +00:00
} else if ( objtype = = " network " ) {
try {
2021-09-02 23:22:52 +00:00
// fprintf(stderr, "%s: commitThread: network\n", _myAddressStr.c_str());
2021-06-02 18:44:00 +00:00
pqxx : : work w ( * c - > c ) ;
2021-10-06 00:02:50 +00:00
std : : string id = config [ " id " ] ;
2021-06-02 18:44:00 +00:00
std : : string remoteTraceTarget = " " ;
2021-10-06 00:02:50 +00:00
if ( ! config [ " remoteTraceTarget " ] . is_null ( ) ) {
remoteTraceTarget = config [ " remoteTraceTarget " ] ;
2020-10-05 20:32:47 +00:00
}
2021-06-02 18:44:00 +00:00
std : : string rulesSource = " " ;
2021-10-06 00:02:50 +00:00
if ( config [ " rulesSource " ] . is_string ( ) ) {
rulesSource = config [ " rulesSource " ] ;
2020-10-05 20:32:47 +00:00
}
2018-11-30 18:37:27 +00:00
2020-10-05 20:32:47 +00:00
// This ugly query exists because when we want to mirror networks to/from
// another data store (e.g. FileDB or LFDB) it is possible to get a network
// that doesn't exist in Central's database. This does an upsert and sets
// the owner_id to the "first" global admin in the user DB if the record
// did not previously exist. If the record already exists owner_id is left
// unchanged, so owner_id should be left out of the update clause.
2021-06-02 18:44:00 +00:00
pqxx : : result res = w . exec_params0 (
2020-10-05 20:32:47 +00:00
" INSERT INTO ztc_network (id, creation_time, owner_id, controller_id, capabilities, enable_broadcast, "
" last_modified, mtu, multicast_limit, name, private, "
" remote_trace_level, remote_trace_target, rules, rules_source, "
2021-06-05 20:44:45 +00:00
" tags, v4_assign_mode, v6_assign_mode, sso_enabled) VALUES ( "
2020-10-05 20:32:47 +00:00
" $1, TO_TIMESTAMP($5::double precision/1000), "
" (SELECT user_id AS owner_id FROM ztc_global_permissions WHERE authorize = true AND del = true AND modify = true AND read = true LIMIT 1), "
" $2, $3, $4, TO_TIMESTAMP($5::double precision/1000), "
2022-06-19 21:59:13 +00:00
" $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) "
2020-10-05 20:32:47 +00:00
" ON CONFLICT (id) DO UPDATE set controller_id = EXCLUDED.controller_id, "
" capabilities = EXCLUDED.capabilities, enable_broadcast = EXCLUDED.enable_broadcast, "
" last_modified = EXCLUDED.last_modified, mtu = EXCLUDED.mtu, "
" multicast_limit = EXCLUDED.multicast_limit, name = EXCLUDED.name, "
" private = EXCLUDED.private, remote_trace_level = EXCLUDED.remote_trace_level, "
" remote_trace_target = EXCLUDED.remote_trace_target, rules = EXCLUDED.rules, "
" rules_source = EXCLUDED.rules_source, tags = EXCLUDED.tags, "
2021-06-05 20:44:45 +00:00
" v4_assign_mode = EXCLUDED.v4_assign_mode, v6_assign_mode = EXCLUDED.v6_assign_mode, "
" sso_enabled = EXCLUDED.sso_enabled " ,
2021-06-02 18:44:00 +00:00
id ,
_myAddressStr ,
2022-12-07 15:22:54 +00:00
OSUtils : : jsonDump ( config [ " capabilities " ] , - 1 ) ,
2021-10-06 00:02:50 +00:00
( bool ) config [ " enableBroadcast " ] ,
2021-06-02 18:44:00 +00:00
OSUtils : : now ( ) ,
2021-10-06 00:02:50 +00:00
( int ) config [ " mtu " ] ,
( int ) config [ " multicastLimit " ] ,
OSUtils : : jsonString ( config [ " name " ] , " " ) ,
( bool ) config [ " private " ] ,
( int ) config [ " remoteTraceLevel " ] ,
2021-06-02 18:44:00 +00:00
remoteTraceTarget ,
2021-10-06 00:02:50 +00:00
OSUtils : : jsonDump ( config [ " rules " ] , - 1 ) ,
2021-06-02 18:44:00 +00:00
rulesSource ,
2021-10-06 00:02:50 +00:00
OSUtils : : jsonDump ( config [ " tags " ] , - 1 ) ,
OSUtils : : jsonDump ( config [ " v4AssignMode " ] , - 1 ) ,
OSUtils : : jsonDump ( config [ " v6AssignMode " ] , - 1 ) ,
OSUtils : : jsonBool ( config [ " ssoEnabled " ] , false ) ) ;
2021-06-02 18:44:00 +00:00
res = w . exec_params0 ( " DELETE FROM ztc_network_assignment_pool WHERE network_id = $1 " , 0 ) ;
2020-10-05 20:32:47 +00:00
2021-10-06 00:02:50 +00:00
auto pool = config [ " ipAssignmentPools " ] ;
2020-10-05 20:32:47 +00:00
bool err = false ;
for ( auto i = pool . begin ( ) ; i ! = pool . end ( ) ; + + i ) {
std : : string start = ( * i ) [ " ipRangeStart " ] ;
std : : string end = ( * i ) [ " ipRangeEnd " ] ;
2018-11-30 18:37:27 +00:00
2021-06-02 18:44:00 +00:00
res = w . exec_params0 (
2020-10-05 20:32:47 +00:00
" INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) "
2021-06-02 18:44:00 +00:00
" VALUES ($1, $2, $3) " , id , start , end ) ;
2020-10-05 20:32:47 +00:00
}
2020-10-05 18:02:40 +00:00
2021-06-02 18:44:00 +00:00
res = w . exec_params0 ( " DELETE FROM ztc_network_route WHERE network_id = $1 " , id ) ;
2020-10-05 20:32:47 +00:00
2021-10-06 00:02:50 +00:00
auto routes = config [ " routes " ] ;
2020-10-05 20:32:47 +00:00
err = false ;
for ( auto i = routes . begin ( ) ; i ! = routes . end ( ) ; + + i ) {
std : : string t = ( * i ) [ " target " ] ;
std : : vector < std : : string > target ;
std : : istringstream f ( t ) ;
std : : string s ;
while ( std : : getline ( f , s , ' / ' ) ) {
target . push_back ( s ) ;
2018-09-05 18:49:07 +00:00
}
2020-10-05 20:32:47 +00:00
if ( target . empty ( ) | | target . size ( ) ! = 2 ) {
2018-09-05 18:49:07 +00:00
continue ;
}
2020-10-05 20:32:47 +00:00
std : : string targetAddr = target [ 0 ] ;
std : : string targetBits = target [ 1 ] ;
std : : string via = " NULL " ;
if ( ! ( * i ) [ " via " ] . is_null ( ) ) {
via = ( * i ) [ " via " ] ;
2018-09-05 18:49:07 +00:00
}
2018-11-30 18:37:27 +00:00
2021-06-02 18:44:00 +00:00
res = w . exec_params0 ( " INSERT INTO ztc_network_route (network_id, address, bits, via) VALUES ($1, $2, $3, $4) " ,
id , targetAddr , targetBits , ( via = = " NULL " ? NULL : via . c_str ( ) ) ) ;
2020-10-05 20:32:47 +00:00
}
if ( err ) {
2021-09-02 23:22:52 +00:00
fprintf ( stderr , " %s: route add error \n " , _myAddressStr . c_str ( ) ) ;
2021-06-02 18:44:00 +00:00
w . abort ( ) ;
_pool - > unborrow ( c ) ;
2020-10-05 20:32:47 +00:00
continue ;
}
2021-06-02 18:44:00 +00:00
2021-10-06 00:02:50 +00:00
auto dns = config [ " dns " ] ;
2020-10-05 20:32:47 +00:00
std : : string domain = dns [ " domain " ] ;
std : : stringstream servers ;
servers < < " { " ;
for ( auto j = dns [ " servers " ] . begin ( ) ; j < dns [ " servers " ] . end ( ) ; + + j ) {
servers < < * j ;
if ( ( j + 1 ) ! = dns [ " servers " ] . end ( ) ) {
servers < < " , " ;
2020-10-05 18:02:40 +00:00
}
2020-10-05 20:32:47 +00:00
}
servers < < " } " ;
2021-06-02 18:44:00 +00:00
std : : string s = servers . str ( ) ;
2020-10-05 20:32:47 +00:00
2021-06-02 18:44:00 +00:00
res = w . exec_params0 ( " INSERT INTO ztc_network_dns (network_id, domain, servers) VALUES ($1, $2, $3) ON CONFLICT (network_id) DO UPDATE SET domain = EXCLUDED.domain, servers = EXCLUDED.servers " ,
id , domain , s ) ;
w . commit ( ) ;
2018-10-24 19:06:17 +00:00
2021-10-06 00:02:50 +00:00
const uint64_t nwidInt = OSUtils : : jsonIntHex ( config [ " nwid " ] , 0ULL ) ;
2018-11-30 18:37:27 +00:00
if ( nwidInt ) {
nlohmann : : json nwOrig ;
2021-10-06 00:02:50 +00:00
nlohmann : : json nwNew ( config ) ;
2018-10-24 19:06:17 +00:00
2018-11-30 18:37:27 +00:00
get ( nwidInt , nwOrig ) ;
2018-10-24 19:06:17 +00:00
2019-08-06 15:42:54 +00:00
_networkChanged ( nwOrig , nwNew , qitem . second ) ;
2018-11-30 18:37:27 +00:00
} else {
2021-09-02 23:22:52 +00:00
fprintf ( stderr , " %s: Can't notify network changed: %llu \n " , _myAddressStr . c_str ( ) , ( unsigned long long ) nwidInt ) ;
2018-11-30 18:37:27 +00:00
}
} catch ( std : : exception & e ) {
2021-09-02 23:22:52 +00:00
fprintf ( stderr , " %s ERROR: Error updating network: %s \n " , _myAddressStr . c_str ( ) , e . what ( ) ) ;
2018-09-05 18:30:17 +00:00
}
2022-06-15 23:05:09 +00:00
if ( _redisMemberStatus ) {
try {
std : : string id = config [ " id " ] ;
std : : string controllerId = _myAddressStr . c_str ( ) ;
std : : string key = " networks:{ " + controllerId + " } " ;
if ( _rc - > clusterMode ) {
_cluster - > sadd ( key , id ) ;
} else {
_redis - > sadd ( key , id ) ;
}
} catch ( sw : : redis : : Error & e ) {
fprintf ( stderr , " ERROR: Error adding network to Redis: %s \n " , e . what ( ) ) ;
}
}
2018-09-05 18:49:07 +00:00
} else if ( objtype = = " _delete_network " ) {
2021-09-02 23:22:52 +00:00
// fprintf(stderr, "%s: commitThread: delete network\n", _myAddressStr.c_str());
2018-09-05 18:49:07 +00:00
try {
2021-06-02 18:44:00 +00:00
pqxx : : work w ( * c - > c ) ;
2021-10-06 00:02:50 +00:00
std : : string networkId = config [ " nwid " ] ;
2018-11-30 18:37:27 +00:00
2021-06-02 18:44:00 +00:00
pqxx : : result res = w . exec_params0 ( " UPDATE ztc_network SET deleted = true WHERE id = $1 " ,
networkId ) ;
w . commit ( ) ;
2018-09-05 18:49:07 +00:00
} catch ( std : : exception & e ) {
2021-09-02 23:22:52 +00:00
fprintf ( stderr , " %s ERROR: Error deleting network: %s \n " , _myAddressStr . c_str ( ) , e . what ( ) ) ;
2018-09-05 18:30:17 +00:00
}
2022-06-15 23:05:09 +00:00
if ( _redisMemberStatus ) {
try {
std : : string id = config [ " id " ] ;
std : : string controllerId = _myAddressStr . c_str ( ) ;
std : : string key = " networks:{ " + controllerId + " } " ;
if ( _rc - > clusterMode ) {
_cluster - > srem ( key , id ) ;
_cluster - > del ( " network-nodes-online:{ " + controllerId + " }: " + id ) ;
} else {
_redis - > srem ( key , id ) ;
_redis - > del ( " network-nodes-online:{ " + controllerId + " }: " + id ) ;
}
} catch ( sw : : redis : : Error & e ) {
fprintf ( stderr , " ERROR: Error adding network to Redis: %s \n " , e . what ( ) ) ;
}
}
2021-06-02 18:44:00 +00:00
2018-09-05 18:49:07 +00:00
} else if ( objtype = = " _delete_member " ) {
2021-09-02 23:22:52 +00:00
// fprintf(stderr, "%s commitThread: delete member\n", _myAddressStr.c_str());
2018-09-05 18:49:07 +00:00
try {
2021-06-02 18:44:00 +00:00
pqxx : : work w ( * c - > c ) ;
2021-10-06 00:02:50 +00:00
std : : string memberId = config [ " id " ] ;
std : : string networkId = config [ " nwid " ] ;
2018-09-05 18:49:07 +00:00
2021-06-02 18:44:00 +00:00
pqxx : : result res = w . exec_params0 (
2018-11-30 18:37:27 +00:00
" UPDATE ztc_member SET hidden = true, deleted = true WHERE id = $1 AND network_id = $2 " ,
2021-06-02 18:44:00 +00:00
memberId , networkId ) ;
2018-11-30 18:37:27 +00:00
2021-06-02 18:44:00 +00:00
w . commit ( ) ;
2018-09-05 18:49:07 +00:00
} catch ( std : : exception & e ) {
2021-09-02 23:22:52 +00:00
fprintf ( stderr , " %s ERROR: Error deleting member: %s \n " , _myAddressStr . c_str ( ) , e . what ( ) ) ;
2018-09-05 18:30:17 +00:00
}
2022-06-15 23:05:09 +00:00
if ( _redisMemberStatus ) {
try {
std : : string memberId = config [ " id " ] ;
std : : string networkId = config [ " nwid " ] ;
std : : string controllerId = _myAddressStr . c_str ( ) ;
std : : string key = " network-nodes-all:{ " + controllerId + " }: " + networkId ;
if ( _rc - > clusterMode ) {
_cluster - > srem ( key , memberId ) ;
_cluster - > del ( " member:{ " + controllerId + " }: " + networkId + " : " + memberId ) ;
} else {
_redis - > srem ( key , memberId ) ;
_redis - > del ( " member:{ " + controllerId + " }: " + networkId + " : " + memberId ) ;
}
} catch ( sw : : redis : : Error & e ) {
fprintf ( stderr , " ERROR: Error deleting member from Redis: %s \n " , e . what ( ) ) ;
}
}
2018-09-05 18:49:07 +00:00
} else {
2021-09-02 23:22:52 +00:00
fprintf ( stderr , " %s ERROR: unknown objtype \n " , _myAddressStr . c_str ( ) ) ;
2018-09-04 23:05:34 +00:00
}
2018-09-05 18:49:07 +00:00
} catch ( std : : exception & e ) {
2021-09-02 23:22:52 +00:00
fprintf ( stderr , " %s ERROR: Error getting objtype: %s \n " , _myAddressStr . c_str ( ) , e . what ( ) ) ;
2018-09-04 23:05:34 +00:00
}
2021-10-05 16:25:24 +00:00
_pool - > unborrow ( c ) ;
c . reset ( ) ;
2020-10-05 20:32:47 +00:00
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 100 ) ) ;
2018-08-31 21:58:15 +00:00
}
2018-11-30 18:37:27 +00:00
2021-09-02 23:22:52 +00:00
fprintf ( stderr , " %s commitThread finished \n " , _myAddressStr . c_str ( ) ) ;
2018-08-31 21:58:15 +00:00
}
void PostgreSQL : : onlineNotificationThread ( )
2020-05-14 00:23:27 +00:00
{
2022-05-10 15:36:39 +00:00
waitForReady ( ) ;
if ( _redisMemberStatus ) {
onlineNotification_Redis ( ) ;
} else {
onlineNotification_Postgres ( ) ;
}
2020-05-29 02:22:07 +00:00
}
2022-06-15 23:05:09 +00:00
/**
* ONLY UNCOMMENT FOR TEMPORARY DB MAINTENANCE
*
2022-12-07 15:17:53 +00:00
* This define temporarily turns off writing to the member status table
2022-06-15 23:05:09 +00:00
* so it can be reindexed when the indexes get too large .
*/
// #define DISABLE_MEMBER_STATUS 1
2020-05-29 02:22:07 +00:00
void PostgreSQL : : onlineNotification_Postgres ( )
{
2018-08-31 21:58:15 +00:00
_connected = 1 ;
2020-05-27 02:00:55 +00:00
nlohmann : : json jtmp1 , jtmp2 ;
2020-05-29 02:22:07 +00:00
while ( _run = = 1 ) {
2021-09-02 19:45:19 +00:00
auto c = _pool - > borrow ( ) ;
2021-10-06 16:39:30 +00:00
auto c2 = _pool - > borrow ( ) ;
2021-06-02 18:44:00 +00:00
try {
2021-09-02 23:22:52 +00:00
fprintf ( stderr , " %s onlineNotification_Postgres \n " , _myAddressStr . c_str ( ) ) ;
2021-06-02 18:44:00 +00:00
std : : unordered_map < std : : pair < uint64_t , uint64_t > , std : : pair < int64_t , InetAddress > , _PairHasher > lastOnline ;
{
std : : lock_guard < std : : mutex > l ( _lastOnline_l ) ;
lastOnline . swap ( _lastOnline ) ;
}
2022-06-15 23:05:09 +00:00
# ifndef DISABLE_MEMBER_STATUS
2021-06-02 18:44:00 +00:00
pqxx : : work w ( * c - > c ) ;
2021-10-06 16:39:30 +00:00
pqxx : : work w2 ( * c2 - > c ) ;
2021-06-02 18:44:00 +00:00
2021-10-06 16:39:30 +00:00
fprintf ( stderr , " online notification tick \n " ) ;
2021-06-02 18:44:00 +00:00
bool firstRun = true ;
bool memberAdded = false ;
2021-06-02 21:08:09 +00:00
int updateCount = 0 ;
2021-10-06 16:39:30 +00:00
pqxx : : pipeline pipe ( w ) ;
2021-06-02 18:44:00 +00:00
for ( auto i = lastOnline . begin ( ) ; i ! = lastOnline . end ( ) ; + + i ) {
2021-06-02 21:08:09 +00:00
updateCount + = 1 ;
2021-06-02 18:44:00 +00:00
uint64_t nwid_i = i - > first . first ;
char nwidTmp [ 64 ] ;
char memTmp [ 64 ] ;
char ipTmp [ 64 ] ;
OSUtils : : ztsnprintf ( nwidTmp , sizeof ( nwidTmp ) , " %.16llx " , nwid_i ) ;
OSUtils : : ztsnprintf ( memTmp , sizeof ( memTmp ) , " %.10llx " , i - > first . second ) ;
if ( ! get ( nwid_i , jtmp1 , i - > first . second , jtmp2 ) ) {
continue ; // skip non existent networks/members
}
2018-09-04 21:00:02 +00:00
2021-06-02 18:44:00 +00:00
std : : string networkId ( nwidTmp ) ;
std : : string memberId ( memTmp ) ;
2019-08-23 16:23:39 +00:00
2021-06-02 18:44:00 +00:00
try {
2021-10-06 16:39:30 +00:00
pqxx : : row r = w2 . exec_params1 ( " SELECT id, network_id FROM ztc_member WHERE network_id = $1 AND id = $2 " ,
2021-09-02 19:45:19 +00:00
networkId , memberId ) ;
2021-06-02 18:44:00 +00:00
} catch ( pqxx : : unexpected_rows & e ) {
continue ;
}
2019-04-04 19:40:49 +00:00
int64_t ts = i - > second . first ;
std : : string ipAddr = i - > second . second . toIpString ( ipTmp ) ;
std : : string timestamp = std : : to_string ( ts ) ;
2019-08-23 16:23:39 +00:00
2021-10-06 16:39:30 +00:00
std : : stringstream memberUpdate ;
memberUpdate < < " INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES "
< < " (' " < < networkId < < " ', ' " < < memberId < < " ', " ;
2019-04-04 22:11:01 +00:00
if ( ipAddr . empty ( ) ) {
memberUpdate < < " NULL, " ;
} else {
memberUpdate < < " ' " < < ipAddr < < " ', " ;
}
2021-10-06 16:39:30 +00:00
memberUpdate < < " TO_TIMESTAMP( " < < timestamp < < " ::double precision/1000)) "
< < " ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated " ;
2019-04-04 19:40:49 +00:00
2021-10-06 16:39:30 +00:00
pipe . insert ( memberUpdate . str ( ) ) ;
2023-04-25 19:44:18 +00:00
Metrics : : pgsql_node_checkin + + ;
2021-10-06 16:39:30 +00:00
}
while ( ! pipe . empty ( ) ) {
pipe . retrieve ( ) ;
2019-04-04 19:40:49 +00:00
}
2021-10-06 16:39:30 +00:00
pipe . complete ( ) ;
w . commit ( ) ;
2021-09-02 23:32:40 +00:00
fprintf ( stderr , " %s: Updated online status of %d members \n " , _myAddressStr . c_str ( ) , updateCount ) ;
2022-06-15 23:05:09 +00:00
# endif
2021-06-02 18:44:00 +00:00
} catch ( std : : exception & e ) {
fprintf ( stderr , " %s: error in onlinenotification thread: %s \n " , _myAddressStr . c_str ( ) , e . what ( ) ) ;
2021-09-02 19:45:19 +00:00
}
2021-10-06 16:39:30 +00:00
_pool - > unborrow ( c2 ) ;
2021-09-02 19:45:19 +00:00
_pool - > unborrow ( c ) ;
ConnectionPoolStats stats = _pool - > get_stats ( ) ;
2021-09-02 20:48:08 +00:00
fprintf ( stderr , " %s pool stats: in use size: %llu, available size: %llu, total: %llu \n " ,
_myAddressStr . c_str ( ) , stats . borrowed_size , stats . pool_size , ( stats . borrowed_size + stats . pool_size ) ) ;
2018-09-06 22:14:16 +00:00
2020-09-10 20:18:25 +00:00
std : : this_thread : : sleep_for ( std : : chrono : : seconds ( 10 ) ) ;
2020-05-29 02:22:07 +00:00
}
fprintf ( stderr , " %s: Fell out of run loop in onlineNotificationThread \n " , _myAddressStr . c_str ( ) ) ;
if ( _run = = 1 ) {
fprintf ( stderr , " ERROR: %s onlineNotificationThread should still be running! Exiting Controller. \n " , _myAddressStr . c_str ( ) ) ;
exit ( 6 ) ;
}
2018-08-31 21:58:15 +00:00
}
2018-12-03 23:19:15 +00:00
2020-05-14 00:23:27 +00:00
void PostgreSQL : : onlineNotification_Redis ( )
{
_connected = 1 ;
char buf [ 11 ] = { 0 } ;
std : : string controllerId = std : : string ( _myAddress . toString ( buf ) ) ;
while ( _run = = 1 ) {
2022-06-22 17:01:07 +00:00
fprintf ( stderr , " onlineNotification tick \n " ) ;
2022-06-22 16:25:47 +00:00
auto start = std : : chrono : : high_resolution_clock : : now ( ) ;
2022-06-22 17:36:29 +00:00
uint64_t count = 0 ;
2022-06-22 16:25:47 +00:00
2020-05-14 00:23:27 +00:00
std : : unordered_map < std : : pair < uint64_t , uint64_t > , std : : pair < int64_t , InetAddress > , _PairHasher > lastOnline ;
{
std : : lock_guard < std : : mutex > l ( _lastOnline_l ) ;
lastOnline . swap ( _lastOnline ) ;
}
2020-05-20 18:38:04 +00:00
try {
if ( ! lastOnline . empty ( ) ) {
if ( _rc - > clusterMode ) {
2022-06-30 16:46:38 +00:00
auto tx = _cluster - > transaction ( controllerId , true , false ) ;
2022-06-22 17:36:29 +00:00
count = _doRedisUpdate ( tx , controllerId , lastOnline ) ;
2020-05-20 18:38:04 +00:00
} else {
2022-06-30 16:46:38 +00:00
auto tx = _redis - > transaction ( true , false ) ;
2022-06-22 17:36:29 +00:00
count = _doRedisUpdate ( tx , controllerId , lastOnline ) ;
2020-05-20 18:38:04 +00:00
}
2020-05-18 20:58:29 +00:00
}
2020-05-20 18:38:04 +00:00
} catch ( sw : : redis : : Error & e ) {
fprintf ( stderr , " Error in online notification thread (redis): %s \n " , e . what ( ) ) ;
}
2022-06-22 16:25:47 +00:00
auto end = std : : chrono : : high_resolution_clock : : now ( ) ;
auto dur = std : : chrono : : duration_cast < std : : chrono : : milliseconds > ( end - start ) ;
auto total = dur . count ( ) ;
2022-06-22 17:01:07 +00:00
fprintf ( stderr , " onlineNotification ran in %llu ms \n " , total ) ;
2022-06-22 16:25:47 +00:00
2022-06-24 17:02:36 +00:00
std : : this_thread : : sleep_for ( std : : chrono : : seconds ( 5 ) ) ;
2020-05-14 00:23:27 +00:00
}
}
2022-06-22 17:36:29 +00:00
uint64_t PostgreSQL : : _doRedisUpdate ( sw : : redis : : Transaction & tx , std : : string & controllerId ,
2020-05-14 00:23:27 +00:00
std : : unordered_map < std : : pair < uint64_t , uint64_t > , std : : pair < int64_t , InetAddress > , _PairHasher > & lastOnline )
{
nlohmann : : json jtmp1 , jtmp2 ;
2022-05-10 15:36:39 +00:00
uint64_t count = 0 ;
2020-05-14 00:23:27 +00:00
for ( auto i = lastOnline . begin ( ) ; i ! = lastOnline . end ( ) ; + + i ) {
uint64_t nwid_i = i - > first . first ;
uint64_t memberid_i = i - > first . second ;
char nwidTmp [ 64 ] ;
char memTmp [ 64 ] ;
char ipTmp [ 64 ] ;
OSUtils : : ztsnprintf ( nwidTmp , sizeof ( nwidTmp ) , " %.16llx " , nwid_i ) ;
OSUtils : : ztsnprintf ( memTmp , sizeof ( memTmp ) , " %.10llx " , memberid_i ) ;
if ( ! get ( nwid_i , jtmp1 , memberid_i , jtmp2 ) ) {
continue ; // skip non existent members/networks
}
std : : string networkId ( nwidTmp ) ;
std : : string memberId ( memTmp ) ;
int64_t ts = i - > second . first ;
std : : string ipAddr = i - > second . second . toIpString ( ipTmp ) ;
std : : string timestamp = std : : to_string ( ts ) ;
std : : unordered_map < std : : string , std : : string > record = {
{ " id " , memberId } ,
{ " address " , ipAddr } ,
{ " last_updated " , std : : to_string ( ts ) }
} ;
tx . zadd ( " nodes-online:{ " + controllerId + " } " , memberId , ts )
2020-05-28 03:43:20 +00:00
. zadd ( " nodes-online2:{ " + controllerId + " } " , networkId + " - " + memberId , ts )
2020-05-14 00:23:27 +00:00
. zadd ( " network-nodes-online:{ " + controllerId + " }: " + networkId , memberId , ts )
2020-05-22 17:07:39 +00:00
. zadd ( " active-networks:{ " + controllerId + " } " , networkId , ts )
2020-05-14 00:23:27 +00:00
. sadd ( " network-nodes-all:{ " + controllerId + " }: " + networkId , memberId )
2020-05-20 23:28:28 +00:00
. hmset ( " member:{ " + controllerId + " }: " + networkId + " : " + memberId , record . begin ( ) , record . end ( ) ) ;
2022-05-10 15:36:39 +00:00
+ + count ;
2023-04-25 19:44:18 +00:00
Metrics : : redis_node_checkin + + ;
2020-05-14 00:23:27 +00:00
}
// expire records from all-nodes and network-nodes member list
uint64_t expireOld = OSUtils : : now ( ) - 300000 ;
2022-05-10 15:36:39 +00:00
tx . zremrangebyscore ( " nodes-online:{ " + controllerId + " } " ,
sw : : redis : : RightBoundedInterval < double > ( expireOld ,
sw : : redis : : BoundType : : LEFT_OPEN ) ) ;
tx . zremrangebyscore ( " nodes-online2:{ " + controllerId + " } " ,
sw : : redis : : RightBoundedInterval < double > ( expireOld ,
sw : : redis : : BoundType : : LEFT_OPEN ) ) ;
tx . zremrangebyscore ( " active-networks:{ " + controllerId + " } " ,
sw : : redis : : RightBoundedInterval < double > ( expireOld ,
sw : : redis : : BoundType : : LEFT_OPEN ) ) ;
2020-05-28 03:41:47 +00:00
{
std : : lock_guard < std : : mutex > l ( _networks_l ) ;
for ( const auto & it : _networks ) {
uint64_t nwid_i = it . first ;
char nwidTmp [ 64 ] ;
OSUtils : : ztsnprintf ( nwidTmp , sizeof ( nwidTmp ) , " %.16llx " , nwid_i ) ;
tx . zremrangebyscore ( " network-nodes-online:{ " + controllerId + " }: " + nwidTmp ,
sw : : redis : : RightBoundedInterval < double > ( expireOld , sw : : redis : : BoundType : : LEFT_OPEN ) ) ;
}
2020-05-14 00:23:27 +00:00
}
tx . exec ( ) ;
2022-05-10 15:36:39 +00:00
fprintf ( stderr , " %s: Updated online status of %d members \n " , _myAddressStr . c_str ( ) , count ) ;
2022-06-22 17:36:29 +00:00
return count ;
2020-05-14 00:23:27 +00:00
}
2019-08-06 12:51:50 +00:00
2018-09-28 17:55:39 +00:00
# endif //ZT_CONTROLLER_USE_LIBPQ