Cruftectomy, work in progress on mirrorring

This commit is contained in:
Adam Ierymenko 2019-08-06 08:51:23 -05:00
parent 37d508ab96
commit 3c776675b3
No known key found for this signature in database
GPG Key ID: 1657198823E52A61
12 changed files with 261 additions and 43 deletions

View File

@ -27,6 +27,8 @@
#ifndef ZT_CONTROLLER_DB_HPP
#define ZT_CONTROLLER_DB_HPP
#define ZT_CONTROLLER_USE_LIBPQ
#include "../node/Constants.hpp"
#include "../node/Identity.hpp"
#include "../node/InetAddress.hpp"
@ -99,7 +101,7 @@ public:
bool summary(const uint64_t networkId,NetworkSummaryInfo &info);
void networks(std::vector<uint64_t> &networks);
virtual void save(nlohmann::json *orig,nlohmann::json &record) = 0;
virtual void save(nlohmann::json &record) = 0;
virtual void eraseNetwork(const uint64_t networkId) = 0;
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0;

112
controller/DBMirrorSet.cpp Normal file
View File

@ -0,0 +1,112 @@
/*
* ZeroTier One - Network Virtualization Everywhere
* Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* --
*
* You can be released from the requirements of the license by purchasing
* a commercial license. Buying such a license is mandatory as soon as you
* develop commercial closed-source software that incorporates or links
* directly against ZeroTier software without disclosing the source code
* of your own application.
*/
#include "DBMirrorSet.hpp"
namespace ZeroTier {
DBMirrorSet::DBMirrorSet()
{
}
DBMirrorSet::~DBMirrorSet()
{
}
bool DBMirrorSet::waitForReady()
{
bool r = false;
std::lock_guard<std::mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
r |= (*d)->waitForReady();
}
return r;
}
bool DBMirrorSet::isReady()
{
std::lock_guard<std::mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
if (!(*d)->isReady())
return false;
}
return true;
}
void DBMirrorSet::save(nlohmann::json &record)
{
std::lock_guard<std::mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
(*d)->save(record);
}
}
void DBMirrorSet::eraseNetwork(const uint64_t networkId)
{
std::lock_guard<std::mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
(*d)->eraseNetwork(networkId);
}
}
void DBMirrorSet::eraseMember(const uint64_t networkId,const uint64_t memberId)
{
std::lock_guard<std::mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
(*d)->eraseMember(networkId,memberId);
}
}
void DBMirrorSet::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress)
{
std::lock_guard<std::mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
(*d)->nodeIsOnline(networkId,memberId,physicalAddress);
}
}
void DBMirrorSet::onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network)
{
std::lock_guard<std::mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
if (d->get() != db) {
}
}
}
void DBMirrorSet::onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member)
{
}
void DBMirrorSet::onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId)
{
}
void DBMirrorSet::onNetworkMemberOnline(const DB *db,uint64_t networkId,uint64_t memberId,const InetAddress &physicalAddress)
{
}
} // namespace ZeroTier

View File

@ -0,0 +1,70 @@
/*
* ZeroTier One - Network Virtualization Everywhere
* Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* --
*
* You can be released from the requirements of the license by purchasing
* a commercial license. Buying such a license is mandatory as soon as you
* develop commercial closed-source software that incorporates or links
* directly against ZeroTier software without disclosing the source code
* of your own application.
*/
#ifndef ZT_DBMIRRORSET_HPP
#define ZT_DBMIRRORSET_HPP
#include "DB.hpp"
#include <vector>
#include <memory>
#include <mutex>
namespace ZeroTier {
class DBMirrorSet : public DB::ChangeListener
{
public:
DBMirrorSet();
virtual ~DBMirrorSet();
bool waitForReady();
bool isReady();
void save(nlohmann::json &record);
void eraseNetwork(const uint64_t networkId);
void eraseMember(const uint64_t networkId,const uint64_t memberId);
void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress);
// These are called by various DB instances when changes occur.
virtual void onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network);
virtual void onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member);
virtual void onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId);
virtual void onNetworkMemberOnline(const DB *db,uint64_t networkId,uint64_t memberId,const InetAddress &physicalAddress);
inline void addDB(const std::shared_ptr<DB> &db)
{
std::lock_guard<std::mutex> l(_dbs_l);
_dbs.push_back(db);
}
private:
std::vector< std::shared_ptr< DB > > _dbs;
std::mutex _dbs_l;
};
} // namespace ZeroTier
#endif

View File

@ -705,7 +705,6 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
json member,network;
_db->get(nwid,network,address,member);
json origMember(member); // for detecting changes
DB::initMember(member);
try {
@ -799,7 +798,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
member["nwid"] = nwids;
DB::cleanMember(member);
_db->save(&origMember,member);
_db->save(member);
responseBody = OSUtils::jsonDump(member);
responseContentType = "application/json";
@ -830,7 +829,6 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
json network;
_db->get(nwid,network);
json origNetwork(network); // for detecting changes
DB::initNetwork(network);
try {
@ -1061,7 +1059,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
network["nwid"] = nwids; // legacy
DB::cleanNetwork(network);
_db->save(&origNetwork,network);
_db->save(network);
responseBody = OSUtils::jsonDump(network);
responseContentType = "application/json";
@ -1184,7 +1182,7 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt)
d["objtype"] = "trace";
d["ts"] = now;
d["nodeId"] = Utils::hex10(rt.origin,tmp);
_db->save((nlohmann::json *)0,d);
_db->save(d);
} catch ( ... ) {
// drop invalid trace messages if an error occurs
}
@ -1235,7 +1233,7 @@ void EmbeddedNetworkController::_request(
{
char nwids[24];
DB::NetworkSummaryInfo ns;
json network,member,origMember;
json network,member;
if (!_db)
return;
@ -1261,7 +1259,6 @@ void EmbeddedNetworkController::_request(
_sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_OBJECT_NOT_FOUND);
return;
}
origMember = member;
const bool newMember = ((!member.is_object())||(member.size() == 0));
DB::initMember(member);
@ -1362,7 +1359,7 @@ void EmbeddedNetworkController::_request(
} else {
// If they are not authorized, STOP!
DB::cleanMember(member);
_db->save(&origMember,member);
_db->save(member);
_sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_ACCESS_DENIED);
return;
}
@ -1734,7 +1731,7 @@ void EmbeddedNetworkController::_request(
}
DB::cleanMember(member);
_db->save(&origMember,member);
_db->save(member);
_sender->ncSendConfig(nwid,requestPacketId,identity.address(),*(nc.get()),metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_VERSION,0) < 6);
}

View File

@ -85,38 +85,35 @@ FileDB::~FileDB()
bool FileDB::waitForReady() { return true; }
bool FileDB::isReady() { return true; }
void FileDB::save(nlohmann::json *orig,nlohmann::json &record)
void FileDB::save(nlohmann::json &record)
{
char p1[4096],p2[4096],pb[4096];
try {
if (orig) {
if (*orig != record) {
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1;
}
} else {
record["revision"] = 1;
}
const std::string objtype = record["objtype"];
if (objtype == "network") {
const uint64_t nwid = OSUtils::jsonIntHex(record["id"],0ULL);
if (nwid) {
nlohmann::json old;
get(nwid,old);
if ((!old.is_object())||(old != record)) {
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),nwid);
if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1)))
fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1);
_networkChanged(old,record,true);
}
}
} else if (objtype == "member") {
const uint64_t id = OSUtils::jsonIntHex(record["id"],0ULL);
const uint64_t nwid = OSUtils::jsonIntHex(record["nwid"],0ULL);
if ((id)&&(nwid)) {
nlohmann::json network,old;
get(nwid,network,id,old);
if ((!old.is_object())||(old != record)) {
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
OSUtils::ztsnprintf(pb,sizeof(pb),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member",_networksPath.c_str(),(unsigned long long)nwid);
OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.10llx.json",pb,(unsigned long long)id);
if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1))) {
@ -129,12 +126,7 @@ void FileDB::save(nlohmann::json *orig,nlohmann::json &record)
_memberChanged(old,record,true);
}
}
} else if (objtype == "trace") {
const std::string id = record["id"];
if (id.length() > 0) {
OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%s.json",_tracePath.c_str(),id.c_str());
OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1));
}
}
} catch ( ... ) {} // drop invalid records missing fields
}

View File

@ -40,7 +40,7 @@ public:
virtual bool waitForReady();
virtual bool isReady();
virtual void save(nlohmann::json *orig,nlohmann::json &record);
virtual void save(nlohmann::json &record);
virtual void eraseNetwork(const uint64_t networkId);
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId);
virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress);

View File

@ -335,16 +335,8 @@ bool LFDB::isReady()
return (_ready.load());
}
void LFDB::save(nlohmann::json *orig,nlohmann::json &record)
void LFDB::save(nlohmann::json &record)
{
if (orig) {
if (*orig != record) {
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1;
}
} else {
record["revision"] = 1;
}
const std::string objtype = record["objtype"];
if (objtype == "network") {
const uint64_t nwid = OSUtils::jsonIntHex(record["id"],0ULL);
@ -352,6 +344,7 @@ void LFDB::save(nlohmann::json *orig,nlohmann::json &record)
nlohmann::json old;
get(nwid,old);
if ((!old.is_object())||(old != record)) {
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
_networkChanged(old,record,true);
{
std::lock_guard<std::mutex> l(_state_l);
@ -366,6 +359,7 @@ void LFDB::save(nlohmann::json *orig,nlohmann::json &record)
nlohmann::json network,old;
get(nwid,network,id,old);
if ((!old.is_object())||(old != record)) {
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
_memberChanged(old,record,true);
{
std::lock_guard<std::mutex> l(_state_l);

View File

@ -56,7 +56,7 @@ public:
virtual bool waitForReady();
virtual bool isReady();
virtual void save(nlohmann::json *orig,nlohmann::json &record);
virtual void save(nlohmann::json &record);
virtual void eraseNetwork(const uint64_t networkId);
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId);
virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress);

View File

@ -165,12 +165,35 @@ bool PostgreSQL::isReady()
return ((_ready == 2)&&(_connected));
}
void PostgreSQL::save(nlohmann::json *orig, nlohmann::json &record)
void PostgreSQL::save(nlohmann::json &record)
{
try {
if (!record.is_object()) {
if (!record.is_object())
return;
const std::string objtype = record["objtype"];
if (objtype == "network") {
const uint64_t nwid = OSUtils::jsonIntHex(record["id"],0ULL);
if (nwid) {
nlohmann::json old;
get(nwid,old);
if ((!old.is_object())||(old != record)) {
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
_commitQueue.post(new nlohmann::json(record));
}
}
} else if (objtype == "member") {
const uint64_t nwid = OSUtils::jsonIntHex(record["nwid"],0ULL);
const uint64_t id = OSUtils::jsonIntHex(record["id"],0ULL);
if ((id)&&(nwid)) {
nlohmann::json network,old;
get(nwid,network,id,old);
if ((!old.is_object())||(old != record)) {
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
_commitQueue.post(new nlohmann::json(record));
}
}
}
/*
waitForReady();
if (orig) {
if (*orig != record) {
@ -181,6 +204,7 @@ void PostgreSQL::save(nlohmann::json *orig, nlohmann::json &record)
record["revision"] = 1;
_commitQueue.post(new nlohmann::json(record));
}
*/
} catch (std::exception &e) {
fprintf(stderr, "Error on PostgreSQL::save: %s\n", e.what());
} catch (...) {

View File

@ -23,16 +23,14 @@
* directly against ZeroTier software without disclosing the source code
* of your own application.
*/
#define ZT_CONTROLLER_USE_LIBPQ
#include "DB.hpp"
#ifdef ZT_CONTROLLER_USE_LIBPQ
#ifndef ZT_CONTROLLER_LIBPQ_HPP
#define ZT_CONTROLLER_LIBPQ_HPP
#include "DB.hpp"
#define ZT_CENTRAL_CONTROLLER_COMMIT_THREADS 4
extern "C" {
@ -57,7 +55,7 @@ public:
virtual bool waitForReady();
virtual bool isReady();
virtual void save(nlohmann::json *orig, nlohmann::json &record);
virtual void save(nlohmann::json &record);
virtual void eraseNetwork(const uint64_t networkId);
virtual void eraseMember(const uint64_t networkId, const uint64_t memberId);
virtual void nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress);

View File

@ -1,3 +1,30 @@
/*
* ZeroTier One - Network Virtualization Everywhere
* Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* --
*
* You can be released from the requirements of the license by purchasing
* a commercial license. Buying such a license is mandatory as soon as you
* develop commercial closed-source software that incorporates or links
* directly against ZeroTier software without disclosing the source code
* of your own application.
*/
#include "RabbitMQ.hpp"
#ifdef ZT_CONTROLLER_USE_LIBPQ

View File

@ -27,6 +27,8 @@
#ifndef ZT_CONTROLLER_RABBITMQ_HPP
#define ZT_CONTROLLER_RABBITMQ_HPP
#include "DB.hpp"
namespace ZeroTier
{
struct MQConfig {