Re-integrate in-filesystem DB into new controller DB structure.

This commit is contained in:
Adam Ierymenko 2017-11-07 14:44:46 -08:00
parent 7fc9094d8e
commit 1613f42d00
11 changed files with 654 additions and 362 deletions

314
controller/DB.cpp Normal file
View File

@ -0,0 +1,314 @@
/*
* ZeroTier One - Network Virtualization Everywhere
* Copyright (C) 2011-2015 ZeroTier, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "DB.hpp"
#include "EmbeddedNetworkController.hpp"
#include <chrono>
#include <algorithm>
#include <stdexcept>
using json = nlohmann::json;
namespace ZeroTier {
DB::DB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path) :
_controller(nc),
_myAddress(myAddress),
_path((path) ? path : "")
{
{
char tmp[32];
_myAddress.toString(tmp);
_myAddressStr = tmp;
}
}
DB::~DB()
{
}
bool DB::get(const uint64_t networkId,nlohmann::json &network)
{
waitForReady();
std::shared_ptr<_Network> nw;
{
std::lock_guard<std::mutex> l(_networks_l);
auto nwi = _networks.find(networkId);
if (nwi == _networks.end())
return false;
nw = nwi->second;
}
{
std::lock_guard<std::mutex> l2(nw->lock);
network = nw->config;
}
return true;
}
bool DB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member)
{
waitForReady();
std::shared_ptr<_Network> nw;
{
std::lock_guard<std::mutex> l(_networks_l);
auto nwi = _networks.find(networkId);
if (nwi == _networks.end())
return false;
nw = nwi->second;
}
{
std::lock_guard<std::mutex> l2(nw->lock);
network = nw->config;
auto m = nw->members.find(memberId);
if (m == nw->members.end())
return false;
member = m->second;
}
return true;
}
bool DB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info)
{
waitForReady();
std::shared_ptr<_Network> nw;
{
std::lock_guard<std::mutex> l(_networks_l);
auto nwi = _networks.find(networkId);
if (nwi == _networks.end())
return false;
nw = nwi->second;
}
{
std::lock_guard<std::mutex> l2(nw->lock);
network = nw->config;
_fillSummaryInfo(nw,info);
auto m = nw->members.find(memberId);
if (m == nw->members.end())
return false;
member = m->second;
}
return true;
}
bool DB::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members)
{
waitForReady();
std::shared_ptr<_Network> nw;
{
std::lock_guard<std::mutex> l(_networks_l);
auto nwi = _networks.find(networkId);
if (nwi == _networks.end())
return false;
nw = nwi->second;
}
{
std::lock_guard<std::mutex> l2(nw->lock);
network = nw->config;
for(auto m=nw->members.begin();m!=nw->members.end();++m)
members.push_back(m->second);
}
return true;
}
bool DB::summary(const uint64_t networkId,NetworkSummaryInfo &info)
{
waitForReady();
std::shared_ptr<_Network> nw;
{
std::lock_guard<std::mutex> l(_networks_l);
auto nwi = _networks.find(networkId);
if (nwi == _networks.end())
return false;
nw = nwi->second;
}
{
std::lock_guard<std::mutex> l2(nw->lock);
_fillSummaryInfo(nw,info);
}
return true;
}
void DB::networks(std::vector<uint64_t> &networks)
{
waitForReady();
std::lock_guard<std::mutex> l(_networks_l);
networks.reserve(_networks.size() + 1);
for(auto n=_networks.begin();n!=_networks.end();++n)
networks.push_back(n->first);
}
void DB::_memberChanged(nlohmann::json &old,nlohmann::json &member,bool push)
{
uint64_t memberId = 0;
uint64_t networkId = 0;
bool isAuth = false;
bool wasAuth = false;
std::shared_ptr<_Network> nw;
if (old.is_object()) {
json &config = old["config"];
if (config.is_object()) {
memberId = OSUtils::jsonIntHex(config["id"],0ULL);
networkId = OSUtils::jsonIntHex(config["nwid"],0ULL);
if ((memberId)&&(networkId)) {
{
std::lock_guard<std::mutex> l(_networks_l);
auto nw2 = _networks.find(networkId);
if (nw2 != _networks.end())
nw = nw2->second;
}
if (nw) {
std::lock_guard<std::mutex> l(nw->lock);
if (OSUtils::jsonBool(config["activeBridge"],false))
nw->activeBridgeMembers.erase(memberId);
wasAuth = OSUtils::jsonBool(config["authorized"],false);
if (wasAuth)
nw->authorizedMembers.erase(memberId);
json &ips = config["ipAssignments"];
if (ips.is_array()) {
for(unsigned long i=0;i<ips.size();++i) {
json &ipj = ips[i];
if (ipj.is_string()) {
const std::string ips = ipj;
InetAddress ipa(ips.c_str());
ipa.setPort(0);
nw->allocatedIps.erase(ipa);
}
}
}
}
}
}
}
if (member.is_object()) {
json &config = member["config"];
if (config.is_object()) {
if (!nw) {
memberId = OSUtils::jsonIntHex(config["id"],0ULL);
networkId = OSUtils::jsonIntHex(config["nwid"],0ULL);
if ((!memberId)||(!networkId))
return;
std::lock_guard<std::mutex> l(_networks_l);
std::shared_ptr<_Network> &nw2 = _networks[networkId];
if (!nw2)
nw2.reset(new _Network);
nw = nw2;
}
{
std::lock_guard<std::mutex> l(nw->lock);
nw->members[memberId] = config;
if (OSUtils::jsonBool(config["activeBridge"],false))
nw->activeBridgeMembers.insert(memberId);
isAuth = OSUtils::jsonBool(config["authorized"],false);
if (isAuth)
nw->authorizedMembers.insert(memberId);
json &ips = config["ipAssignments"];
if (ips.is_array()) {
for(unsigned long i=0;i<ips.size();++i) {
json &ipj = ips[i];
if (ipj.is_string()) {
const std::string ips = ipj;
InetAddress ipa(ips.c_str());
ipa.setPort(0);
nw->allocatedIps.insert(ipa);
}
}
}
if (!isAuth) {
const int64_t ldt = (int64_t)OSUtils::jsonInt(config["lastDeauthorizedTime"],0ULL);
if (ldt > nw->mostRecentDeauthTime)
nw->mostRecentDeauthTime = ldt;
}
}
if (push)
_controller->onNetworkMemberUpdate(networkId,memberId);
}
} else if (memberId) {
if (nw) {
std::lock_guard<std::mutex> l(nw->lock);
nw->members.erase(memberId);
}
if (networkId) {
std::lock_guard<std::mutex> l(_networks_l);
auto er = _networkByMember.equal_range(memberId);
for(auto i=er.first;i!=er.second;++i) {
if (i->second == networkId) {
_networkByMember.erase(i);
break;
}
}
}
}
if ((push)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId)))
_controller->onNetworkMemberDeauthorize(networkId,memberId);
}
void DB::_networkChanged(nlohmann::json &old,nlohmann::json &network,bool push)
{
if (network.is_object()) {
json &config = network["config"];
if (config.is_object()) {
const std::string ids = config["id"];
const uint64_t id = Utils::hexStrToU64(ids.c_str());
if (id) {
std::shared_ptr<_Network> nw;
{
std::lock_guard<std::mutex> l(_networks_l);
std::shared_ptr<_Network> &nw2 = _networks[id];
if (!nw2)
nw2.reset(new _Network);
nw = nw2;
}
{
std::lock_guard<std::mutex> l2(nw->lock);
nw->config = config;
}
if (push)
_controller->onNetworkUpdate(id);
}
}
} else if (old.is_object()) {
const std::string ids = old["id"];
const uint64_t id = Utils::hexStrToU64(ids.c_str());
if (id) {
std::lock_guard<std::mutex> l(_networks_l);
_networks.erase(id);
}
}
}
void DB::_fillSummaryInfo(const std::shared_ptr<_Network> &nw,NetworkSummaryInfo &info)
{
for(auto ab=nw->activeBridgeMembers.begin();ab!=nw->activeBridgeMembers.end();++ab)
info.activeBridges.push_back(Address(*ab));
for(auto ip=nw->allocatedIps.begin();ip!=nw->allocatedIps.end();++ip)
info.allocatedIps.push_back(*ip);
info.authorizedMemberCount = (unsigned long)nw->authorizedMembers.size();
info.totalMemberCount = (unsigned long)nw->members.size();
info.mostRecentDeauthTime = nw->mostRecentDeauthTime;
}
} // namespace ZeroTier

116
controller/DB.hpp Normal file
View File

@ -0,0 +1,116 @@
/*
* ZeroTier One - Network Virtualization Everywhere
* Copyright (C) 2011-2015 ZeroTier, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef ZT_CONTROLLER_DB_HPP
#define ZT_CONTROLLER_DB_HPP
#include "../node/Constants.hpp"
#include "../node/Address.hpp"
#include "../node/InetAddress.hpp"
#include "../osdep/OSUtils.hpp"
#include "../osdep/BlockingQueue.hpp"
#include <memory>
#include <string>
#include <thread>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <atomic>
#include "../ext/json/json.hpp"
#define ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS 2
namespace ZeroTier
{
class EmbeddedNetworkController;
/**
* Base class with common infrastructure for all controller DB implementations
*/
class DB
{
public:
struct NetworkSummaryInfo
{
NetworkSummaryInfo() : authorizedMemberCount(0),totalMemberCount(0),mostRecentDeauthTime(0) {}
std::vector<Address> activeBridges;
std::vector<InetAddress> allocatedIps;
unsigned long authorizedMemberCount;
unsigned long totalMemberCount;
int64_t mostRecentDeauthTime;
};
DB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path);
virtual ~DB();
virtual bool waitForReady() = 0;
inline bool hasNetwork(const uint64_t networkId) const
{
std::lock_guard<std::mutex> l(_networks_l);
return (_networks.find(networkId) != _networks.end());
}
bool get(const uint64_t networkId,nlohmann::json &network);
bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member);
bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info);
bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members);
bool summary(const uint64_t networkId,NetworkSummaryInfo &info);
void networks(std::vector<uint64_t> &networks);
virtual void save(const nlohmann::json &record) = 0;
virtual void eraseNetwork(const uint64_t networkId) = 0;
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0;
protected:
struct _Network
{
_Network() : mostRecentDeauthTime(0) {}
nlohmann::json config;
std::unordered_map<uint64_t,nlohmann::json> members;
std::unordered_set<uint64_t> activeBridgeMembers;
std::unordered_set<uint64_t> authorizedMembers;
std::unordered_set<InetAddress,InetAddress::Hasher> allocatedIps;
int64_t mostRecentDeauthTime;
std::mutex lock;
};
void _memberChanged(nlohmann::json &old,nlohmann::json &member,bool push);
void _networkChanged(nlohmann::json &old,nlohmann::json &network,bool push);
void _fillSummaryInfo(const std::shared_ptr<_Network> &nw,NetworkSummaryInfo &info);
EmbeddedNetworkController *const _controller;
const Address _myAddress;
const std::string _path;
std::string _myAddressStr;
std::unordered_map< uint64_t,std::shared_ptr<_Network> > _networks;
std::unordered_multimap< uint64_t,uint64_t > _networkByMember;
mutable std::mutex _networks_l;
};
} // namespace ZeroTier
#endif

View File

@ -45,13 +45,18 @@
#ifdef ZT_CONTROLLER_USE_RETHINKDB #ifdef ZT_CONTROLLER_USE_RETHINKDB
#include "RethinkDB.hpp" #include "RethinkDB.hpp"
#else
#include "FileDB.hpp"
#endif #endif
namespace ZeroTier { namespace ZeroTier {
#ifdef ZT_CONTROLLER_USE_RETHINKDB #ifdef ZT_CONTROLLER_USE_RETHINKDB
typedef RethinkDB ControllerDB; typedef RethinkDB ControllerDB;
#else
typedef FileDB ControllerDB;
#endif #endif
class Node; class Node;
class EmbeddedNetworkController : public NetworkController class EmbeddedNetworkController : public NetworkController

129
controller/FileDB.cpp Normal file
View File

@ -0,0 +1,129 @@
/*
* ZeroTier One - Network Virtualization Everywhere
* Copyright (C) 2011-2015 ZeroTier, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "FileDB.hpp"
namespace ZeroTier
{
FileDB::FileDB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path) :
DB(nc,myAddress,path),
_networksPath(_path + ZT_PATH_SEPARATOR_S + "network")
{
OSUtils::mkdir(_path.c_str());
OSUtils::lockDownFile(_path.c_str(),true);
std::vector<std::string> networks(OSUtils::listDirectory(_networksPath.c_str(),false));
std::string buf;
for(auto n=networks.begin();n!=networks.end();++n) {
buf.clear();
if ((n->length() == 21)&&(OSUtils::readFile((_networksPath + ZT_PATH_SEPARATOR_S + *n).c_str(),buf))) {
try {
nlohmann::json network(OSUtils::jsonParse(buf));
const std::string nwids = network["id"];
if (nwids.length() == 16) {
nlohmann::json nullJson;
_networkChanged(nullJson,network,false);
std::string membersPath(_networksPath + ZT_PATH_SEPARATOR_S + nwids + ZT_PATH_SEPARATOR_S "member");
std::vector<std::string> members(OSUtils::listDirectory(membersPath.c_str(),false));
for(auto m=members.begin();m!=members.end();++m) {
buf.clear();
if ((m->length() == 15)&&(OSUtils::readFile((membersPath + ZT_PATH_SEPARATOR_S + *m).c_str(),buf))) {
try {
nlohmann::json member(OSUtils::jsonParse(buf));
const std::string addrs = member["id"];
if (addrs.length() == 10) {
nlohmann::json nullJson2;
_memberChanged(nullJson2,member,false);
}
} catch ( ... ) {}
}
}
}
} catch ( ... ) {}
}
}
}
FileDB::~FileDB()
{
}
bool FileDB::waitForReady()
{
return true;
}
void FileDB::save(const nlohmann::json &record)
{
char p1[16384],p2[16384];
try {
nlohmann::json rec(record);
const std::string objtype = rec["objtype"];
if (objtype == "network") {
const uint64_t nwid = OSUtils::jsonIntHex(rec["id"],0ULL);
if (nwid) {
nlohmann::json old;
get(nwid,old);
OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json.new",_networksPath.c_str(),nwid);
OSUtils::ztsnprintf(p2,sizeof(p2),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),nwid);
if (!OSUtils::writeFile(p1,OSUtils::jsonDump(rec,-1)))
fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1);
OSUtils::rename(p1,p2);
_networkChanged(old,rec,true);
}
} else if (objtype == "member") {
const uint64_t id = OSUtils::jsonIntHex(rec["id"],0ULL);
const uint64_t nwid = OSUtils::jsonIntHex(rec["nwid"],0ULL);
if ((id)&&(nwid)) {
nlohmann::json network,old;
get(nwid,network,id,old);
OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member" ZT_PATH_SEPARATOR_S "%.10llx.json.new",_networksPath.c_str(),nwid);
OSUtils::ztsnprintf(p2,sizeof(p2),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member" ZT_PATH_SEPARATOR_S "%.10llx.json",_networksPath.c_str(),nwid);
if (!OSUtils::writeFile(p1,OSUtils::jsonDump(rec,-1)))
fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1);
OSUtils::rename(p1,p2);
_memberChanged(old,rec,true);
}
} else if (objtype == "trace") {
const std::string id = rec["id"];
OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "trace" ZT_PATH_SEPARATOR_S "%s.json",_path.c_str(),id.c_str());
OSUtils::writeFile(p1,OSUtils::jsonDump(rec,-1));
}
} catch ( ... ) {} // drop invalid records missing fields
}
void FileDB::eraseNetwork(const uint64_t networkId)
{
nlohmann::json network,nullJson;
get(networkId,network);
char p[16384];
OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),networkId);
OSUtils::rm(p);
_networkChanged(network,nullJson,true);
}
void FileDB::eraseMember(const uint64_t networkId,const uint64_t memberId)
{
}
} // namespace ZeroTier

47
controller/FileDB.hpp Normal file
View File

@ -0,0 +1,47 @@
/*
* ZeroTier One - Network Virtualization Everywhere
* Copyright (C) 2011-2015 ZeroTier, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef ZT_CONTROLLER_FILEDB_HPP
#define ZT_CONTROLLER_FILEDB_HPP
#include "DB.hpp"
namespace ZeroTier
{
class FileDB : public DB
{
public:
FileDB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path);
virtual ~FileDB();
virtual bool waitForReady();
virtual void save(const nlohmann::json &record);
virtual void eraseNetwork(const uint64_t networkId);
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId);
protected:
std::string _networksPath;
};
} // namespace ZeroTier
#endif

View File

@ -33,12 +33,12 @@ using json = nlohmann::json;
namespace ZeroTier { namespace ZeroTier {
RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path) : RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path) :
_controller(nc), DB(nc,myAddress,path),
_myAddress(myAddress),
_ready(2), // two tables need to be synchronized before we're ready, so this is ready when it reaches 0 _ready(2), // two tables need to be synchronized before we're ready, so this is ready when it reaches 0
_run(1), _run(1),
_waitNoticePrinted(false) _waitNoticePrinted(false)
{ {
// rethinkdb:host:port:db[:auth]
std::vector<std::string> ps(OSUtils::split(path,":","","")); std::vector<std::string> ps(OSUtils::split(path,":","",""));
if ((ps.size() < 4)||(ps[0] != "rethinkdb")) if ((ps.size() < 4)||(ps[0] != "rethinkdb"))
throw std::runtime_error("invalid rethinkdb database url"); throw std::runtime_error("invalid rethinkdb database url");
@ -50,12 +50,6 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres
_readyLock.lock(); _readyLock.lock();
{
char tmp[32];
_myAddress.toString(tmp);
_myAddressStr = tmp;
}
_membersDbWatcher = std::thread([this]() { _membersDbWatcher = std::thread([this]() {
try { try {
while (_run == 1) { while (_run == 1) {
@ -79,7 +73,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres
json &nv = tmp["new_val"]; json &nv = tmp["new_val"];
if (ov.is_object()||nv.is_object()) { if (ov.is_object()||nv.is_object()) {
//if (nv.is_object()) printf("MEMBER: %s" ZT_EOL_S,nv.dump().c_str()); //if (nv.is_object()) printf("MEMBER: %s" ZT_EOL_S,nv.dump().c_str());
this->_memberChanged(ov,nv); this->_memberChanged(ov,nv,(this->_ready <= 0));
} }
} catch ( ... ) {} // ignore bad records } catch ( ... ) {} // ignore bad records
} }
@ -120,7 +114,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres
json &nv = tmp["new_val"]; json &nv = tmp["new_val"];
if (ov.is_object()||nv.is_object()) { if (ov.is_object()||nv.is_object()) {
//if (nv.is_object()) printf("NETWORK: %s" ZT_EOL_S,nv.dump().c_str()); //if (nv.is_object()) printf("NETWORK: %s" ZT_EOL_S,nv.dump().c_str());
this->_networkChanged(ov,nv); this->_networkChanged(ov,nv,(this->_ready <= 0));
} }
} catch ( ... ) {} // ignore bad records } catch ( ... ) {} // ignore bad records
} }
@ -166,18 +160,18 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres
record["controllerId"] = this->_myAddressStr; record["controllerId"] = this->_myAddressStr;
record["config"] = *config; record["config"] = *config;
table = "Network"; table = "Network";
} else if (objtype == "delete_network") { } else if (objtype == "trace") {
record = *config;
table = "RemoteTrace";
} else if (objtype == "_delete_network") {
deleteId = (*config)["id"]; deleteId = (*config)["id"];
table = "Network"; table = "Network";
} else if (objtype == "delete_member") { } else if (objtype == "_delete_member") {
deleteId = (*config)["nwid"]; deleteId = (*config)["nwid"];
deleteId.push_back('-'); deleteId.push_back('-');
const std::string tmp = (*config)["id"]; const std::string tmp = (*config)["id"];
deleteId.append(tmp); deleteId.append(tmp);
table = "Member"; table = "Member";
} else if (objtype == "trace") {
record = *config;
table = "RemoteTrace";
} else { } else {
delete config; delete config;
continue; continue;
@ -259,114 +253,16 @@ RethinkDB::~RethinkDB()
_heartbeatThread.join(); _heartbeatThread.join();
} }
bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network) void RethinkDB::waitForReady() const
{ {
waitForReady(); while (_ready > 0) {
std::shared_ptr<_Network> nw; if (!_waitNoticePrinted) {
{ _waitNoticePrinted = true;
std::lock_guard<std::mutex> l(_networks_l); fprintf(stderr,"NOTICE: controller RethinkDB waiting for initial data download..." ZT_EOL_S);
auto nwi = _networks.find(networkId); }
if (nwi == _networks.end()) _readyLock.lock();
return false; _readyLock.unlock();
nw = nwi->second;
} }
{
std::lock_guard<std::mutex> l2(nw->lock);
network = nw->config;
}
return true;
}
bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member)
{
waitForReady();
std::shared_ptr<_Network> nw;
{
std::lock_guard<std::mutex> l(_networks_l);
auto nwi = _networks.find(networkId);
if (nwi == _networks.end())
return false;
nw = nwi->second;
}
{
std::lock_guard<std::mutex> l2(nw->lock);
network = nw->config;
auto m = nw->members.find(memberId);
if (m == nw->members.end())
return false;
member = m->second;
}
return true;
}
bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info)
{
waitForReady();
std::shared_ptr<_Network> nw;
{
std::lock_guard<std::mutex> l(_networks_l);
auto nwi = _networks.find(networkId);
if (nwi == _networks.end())
return false;
nw = nwi->second;
}
{
std::lock_guard<std::mutex> l2(nw->lock);
network = nw->config;
_fillSummaryInfo(nw,info);
auto m = nw->members.find(memberId);
if (m == nw->members.end())
return false;
member = m->second;
}
return true;
}
bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members)
{
waitForReady();
std::shared_ptr<_Network> nw;
{
std::lock_guard<std::mutex> l(_networks_l);
auto nwi = _networks.find(networkId);
if (nwi == _networks.end())
return false;
nw = nwi->second;
}
{
std::lock_guard<std::mutex> l2(nw->lock);
network = nw->config;
for(auto m=nw->members.begin();m!=nw->members.end();++m)
members.push_back(m->second);
}
return true;
}
bool RethinkDB::summary(const uint64_t networkId,NetworkSummaryInfo &info)
{
waitForReady();
std::shared_ptr<_Network> nw;
{
std::lock_guard<std::mutex> l(_networks_l);
auto nwi = _networks.find(networkId);
if (nwi == _networks.end())
return false;
nw = nwi->second;
}
{
std::lock_guard<std::mutex> l2(nw->lock);
_fillSummaryInfo(nw,info);
}
return true;
}
void RethinkDB::networks(std::vector<uint64_t> &networks)
{
waitForReady();
std::lock_guard<std::mutex> l(_networks_l);
networks.reserve(_networks.size() + 1);
for(auto n=_networks.begin();n!=_networks.end();++n)
networks.push_back(n->first);
} }
void RethinkDB::save(const nlohmann::json &record) void RethinkDB::save(const nlohmann::json &record)
@ -382,7 +278,7 @@ void RethinkDB::eraseNetwork(const uint64_t networkId)
Utils::hex(networkId,tmp2); Utils::hex(networkId,tmp2);
json *tmp = new json(); json *tmp = new json();
(*tmp)["id"] = tmp2; (*tmp)["id"] = tmp2;
(*tmp)["objtype"] = "delete_network"; // pseudo-type, tells thread to delete network (*tmp)["objtype"] = "_delete_network"; // pseudo-type, tells thread to delete network
_commitQueue.post(tmp); _commitQueue.post(tmp);
} }
@ -395,155 +291,10 @@ void RethinkDB::eraseMember(const uint64_t networkId,const uint64_t memberId)
(*tmp)["nwid"] = tmp2; (*tmp)["nwid"] = tmp2;
Utils::hex10(memberId,tmp2); Utils::hex10(memberId,tmp2);
(*tmp)["id"] = tmp2; (*tmp)["id"] = tmp2;
(*tmp)["objtype"] = "delete_member"; // pseudo-type, tells thread to delete network (*tmp)["objtype"] = "_delete_member"; // pseudo-type, tells thread to delete network
_commitQueue.post(tmp); _commitQueue.post(tmp);
} }
void RethinkDB::_memberChanged(nlohmann::json &old,nlohmann::json &member)
{
uint64_t memberId = 0;
uint64_t networkId = 0;
bool isAuth = false;
bool wasAuth = false;
std::shared_ptr<_Network> nw;
if (old.is_object()) {
json &config = old["config"];
if (config.is_object()) {
memberId = OSUtils::jsonIntHex(config["id"],0ULL);
networkId = OSUtils::jsonIntHex(config["nwid"],0ULL);
if ((memberId)&&(networkId)) {
{
std::lock_guard<std::mutex> l(_networks_l);
auto nw2 = _networks.find(networkId);
if (nw2 != _networks.end())
nw = nw2->second;
}
if (nw) {
std::lock_guard<std::mutex> l(nw->lock);
if (OSUtils::jsonBool(config["activeBridge"],false))
nw->activeBridgeMembers.erase(memberId);
wasAuth = OSUtils::jsonBool(config["authorized"],false);
if (wasAuth)
nw->authorizedMembers.erase(memberId);
json &ips = config["ipAssignments"];
if (ips.is_array()) {
for(unsigned long i=0;i<ips.size();++i) {
json &ipj = ips[i];
if (ipj.is_string()) {
const std::string ips = ipj;
InetAddress ipa(ips.c_str());
ipa.setPort(0);
nw->allocatedIps.erase(ipa);
}
}
}
}
}
}
}
if (member.is_object()) {
json &config = member["config"];
if (config.is_object()) {
if (!nw) {
memberId = OSUtils::jsonIntHex(config["id"],0ULL);
networkId = OSUtils::jsonIntHex(config["nwid"],0ULL);
if ((!memberId)||(!networkId))
return;
std::lock_guard<std::mutex> l(_networks_l);
std::shared_ptr<_Network> &nw2 = _networks[networkId];
if (!nw2)
nw2.reset(new _Network);
nw = nw2;
}
{
std::lock_guard<std::mutex> l(nw->lock);
nw->members[memberId] = config;
if (OSUtils::jsonBool(config["activeBridge"],false))
nw->activeBridgeMembers.insert(memberId);
isAuth = OSUtils::jsonBool(config["authorized"],false);
if (isAuth)
nw->authorizedMembers.insert(memberId);
json &ips = config["ipAssignments"];
if (ips.is_array()) {
for(unsigned long i=0;i<ips.size();++i) {
json &ipj = ips[i];
if (ipj.is_string()) {
const std::string ips = ipj;
InetAddress ipa(ips.c_str());
ipa.setPort(0);
nw->allocatedIps.insert(ipa);
}
}
}
if (!isAuth) {
const int64_t ldt = (int64_t)OSUtils::jsonInt(config["lastDeauthorizedTime"],0ULL);
if (ldt > nw->mostRecentDeauthTime)
nw->mostRecentDeauthTime = ldt;
}
}
_controller->onNetworkMemberUpdate(networkId,memberId);
}
} else if (memberId) {
if (nw) {
std::lock_guard<std::mutex> l(nw->lock);
nw->members.erase(memberId);
}
if (networkId) {
std::lock_guard<std::mutex> l(_networks_l);
auto er = _networkByMember.equal_range(memberId);
for(auto i=er.first;i!=er.second;++i) {
if (i->second == networkId) {
_networkByMember.erase(i);
break;
}
}
}
}
if ((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))
_controller->onNetworkMemberDeauthorize(networkId,memberId);
}
void RethinkDB::_networkChanged(nlohmann::json &old,nlohmann::json &network)
{
if (network.is_object()) {
json &config = network["config"];
if (config.is_object()) {
const std::string ids = config["id"];
const uint64_t id = Utils::hexStrToU64(ids.c_str());
if (id) {
std::shared_ptr<_Network> nw;
{
std::lock_guard<std::mutex> l(_networks_l);
std::shared_ptr<_Network> &nw2 = _networks[id];
if (!nw2)
nw2.reset(new _Network);
nw = nw2;
}
{
std::lock_guard<std::mutex> l2(nw->lock);
nw->config = config;
}
_controller->onNetworkUpdate(id);
}
}
} else if (old.is_object()) {
const std::string ids = old["id"];
const uint64_t id = Utils::hexStrToU64(ids.c_str());
if (id) {
std::lock_guard<std::mutex> l(_networks_l);
_networks.erase(id);
}
}
}
} // namespace ZeroTier } // namespace ZeroTier
#endif // ZT_CONTROLLER_USE_RETHINKDB #endif // ZT_CONTROLLER_USE_RETHINKDB

View File

@ -16,112 +16,35 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define ZT_CONTROLLER_USE_RETHINKDB
#ifdef ZT_CONTROLLER_USE_RETHINKDB #ifdef ZT_CONTROLLER_USE_RETHINKDB
#ifndef ZT_CONTROLLER_RETHINKDB_HPP #ifndef ZT_CONTROLLER_RETHINKDB_HPP
#define ZT_CONTROLLER_RETHINKDB_HPP #define ZT_CONTROLLER_RETHINKDB_HPP
#include "../node/Constants.hpp" #include "DB.hpp"
#include "../node/Address.hpp"
#include "../node/InetAddress.hpp"
#include "../osdep/OSUtils.hpp"
#include "../osdep/BlockingQueue.hpp"
#include <memory>
#include <string>
#include <thread>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <atomic>
#include "../ext/json/json.hpp"
#define ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS 2 #define ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS 2
namespace ZeroTier namespace ZeroTier
{ {
class EmbeddedNetworkController; class RethinkDB : public DB
class RethinkDB
{ {
public: public:
struct NetworkSummaryInfo
{
NetworkSummaryInfo() : authorizedMemberCount(0),totalMemberCount(0),mostRecentDeauthTime(0) {}
std::vector<Address> activeBridges;
std::vector<InetAddress> allocatedIps;
unsigned long authorizedMemberCount;
unsigned long totalMemberCount;
int64_t mostRecentDeauthTime;
};
RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path); RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path);
~RethinkDB(); virtual ~RethinkDB();
inline void waitForReady() const virtual void waitForReady() const;
{
while (_ready > 0) {
if (!_waitNoticePrinted) {
_waitNoticePrinted = true;
fprintf(stderr,"NOTICE: controller RethinkDB waiting for initial data download..." ZT_EOL_S);
}
_readyLock.lock();
_readyLock.unlock();
}
}
inline bool hasNetwork(const uint64_t networkId) const virtual void save(const nlohmann::json &record);
{
std::lock_guard<std::mutex> l(_networks_l);
return (_networks.find(networkId) != _networks.end());
}
bool get(const uint64_t networkId,nlohmann::json &network); virtual void eraseNetwork(const uint64_t networkId);
bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member);
bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info);
bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members);
bool summary(const uint64_t networkId,NetworkSummaryInfo &info); virtual void eraseMember(const uint64_t networkId,const uint64_t memberId);
void networks(std::vector<uint64_t> &networks); protected:
void save(const nlohmann::json &record);
void eraseNetwork(const uint64_t networkId);
void eraseMember(const uint64_t networkId,const uint64_t memberId);
private:
struct _Network
{
_Network() : mostRecentDeauthTime(0) {}
nlohmann::json config;
std::unordered_map<uint64_t,nlohmann::json> members;
std::unordered_set<uint64_t> activeBridgeMembers;
std::unordered_set<uint64_t> authorizedMembers;
std::unordered_set<InetAddress,InetAddress::Hasher> allocatedIps;
int64_t mostRecentDeauthTime;
std::mutex lock;
};
void _memberChanged(nlohmann::json &old,nlohmann::json &member);
void _networkChanged(nlohmann::json &old,nlohmann::json &network);
inline void _fillSummaryInfo(const std::shared_ptr<_Network> &nw,NetworkSummaryInfo &info)
{
for(auto ab=nw->activeBridgeMembers.begin();ab!=nw->activeBridgeMembers.end();++ab)
info.activeBridges.push_back(Address(*ab));
for(auto ip=nw->allocatedIps.begin();ip!=nw->allocatedIps.end();++ip)
info.allocatedIps.push_back(*ip);
info.authorizedMemberCount = (unsigned long)nw->authorizedMembers.size();
info.totalMemberCount = (unsigned long)nw->members.size();
info.mostRecentDeauthTime = nw->mostRecentDeauthTime;
}
EmbeddedNetworkController *const _controller;
const Address _myAddress;
std::string _myAddressStr;
std::string _host; std::string _host;
std::string _db; std::string _db;
std::string _auth; std::string _auth;
@ -132,10 +55,6 @@ private:
std::thread _networksDbWatcher; std::thread _networksDbWatcher;
std::thread _membersDbWatcher; std::thread _membersDbWatcher;
std::unordered_map< uint64_t,std::shared_ptr<_Network> > _networks;
std::unordered_multimap< uint64_t,uint64_t > _networkByMember;
mutable std::mutex _networks_l;
BlockingQueue< nlohmann::json * > _commitQueue; BlockingQueue< nlohmann::json * > _commitQueue;
std::thread _commitThread[ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS]; std::thread _commitThread[ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS];

View File

@ -28,6 +28,8 @@ CORE_OBJS=\
ONE_OBJS=\ ONE_OBJS=\
controller/EmbeddedNetworkController.o \ controller/EmbeddedNetworkController.o \
controller/DB.o \
controller/FileDB.o \
controller/RethinkDB.o \ controller/RethinkDB.o \
osdep/ManagedRoute.o \ osdep/ManagedRoute.o \
osdep/Http.o \ osdep/Http.o \

View File

@ -101,7 +101,6 @@ public:
* @return True if delete was successful * @return True if delete was successful
*/ */
static inline bool rm(const char *path) static inline bool rm(const char *path)
throw()
{ {
#ifdef __WINDOWS__ #ifdef __WINDOWS__
return (DeleteFileA(path) != FALSE); return (DeleteFileA(path) != FALSE);
@ -109,7 +108,7 @@ public:
return (unlink(path) == 0); return (unlink(path) == 0);
#endif #endif
} }
static inline bool rm(const std::string &path) throw() { return rm(path.c_str()); } static inline bool rm(const std::string &path) { return rm(path.c_str()); }
static inline bool mkdir(const char *path) static inline bool mkdir(const char *path)
{ {
@ -123,7 +122,17 @@ public:
return true; return true;
#endif #endif
} }
static inline bool mkdir(const std::string &path) throw() { return OSUtils::mkdir(path.c_str()); } static inline bool mkdir(const std::string &path) { return OSUtils::mkdir(path.c_str()); }
static inline bool rename(const char *o,const char *n)
{
#ifdef __WINDOWS__
DeleteFileA(n);
return (::rename(o,n) == 0);
#else
return (::rename(o,n) == 0);
#endif
}
/** /**
* List a directory's contents * List a directory's contents