/* * ZeroTier One - Network Virtualization Everywhere * Copyright (C) 2011-2015 ZeroTier, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include #include #include #ifndef __WINDOWS__ #include #include #include #include #include #include #endif #include "JSONDB.hpp" #define ZT_JSONDB_HTTP_TIMEOUT 60000 namespace ZeroTier { static const nlohmann::json _EMPTY_JSON(nlohmann::json::object()); static const std::map _ZT_JSONDB_GET_HEADERS; JSONDB::JSONDB(const std::string &basePath) : _basePath(basePath), _rawInput(-1), _rawOutput(-1), _summaryThreadRun(true) { if ((_basePath.length() > 7)&&(_basePath.substr(0,7) == "http://")) { // If base path is http:// we run in HTTP mode // TODO: this doesn't yet support IPv6 since bracketed address notiation isn't supported. // Typically it's just used with 127.0.0.1 anyway. std::string hn = _basePath.substr(7); std::size_t hnend = hn.find_first_of('/'); if (hnend != std::string::npos) hn = hn.substr(0,hnend); std::size_t hnsep = hn.find_last_of(':'); if (hnsep != std::string::npos) hn[hnsep] = '/'; _httpAddr.fromString(hn.c_str()); if (hnend != std::string::npos) _basePath = _basePath.substr(7 + hnend); if (_basePath.length() == 0) _basePath = "/"; if (_basePath[0] != '/') _basePath = std::string("/") + _basePath; #ifndef __WINDOWS__ } else if (_basePath == "-") { // If base path is "-" we run in stdin/stdout mode and expect our database to be populated on startup via stdin // Not supported on Windows _rawInput = STDIN_FILENO; _rawOutput = STDOUT_FILENO; fcntl(_rawInput,F_SETFL,O_NONBLOCK); #endif } else { // Default mode of operation is to store files in the filesystem OSUtils::mkdir(_basePath.c_str()); OSUtils::lockDownFile(_basePath.c_str(),true); // networks might contain auth tokens, etc., so restrict directory permissions } if (_rawInput < 0) { unsigned int cnt = 0; while (!_load(_basePath)) { if ((++cnt & 7) == 0) fprintf(stderr,"WARNING: controller still waiting to read '%s'..." ZT_EOL_S,_basePath.c_str()); Thread::sleep(250); } } for(std::unordered_map::iterator n(_networks.begin());n!=_networks.end();++n) _recomputeSummaryInfo(n->first); for(;;) { _summaryThread_m.lock(); if (_summaryThreadToDo.empty()) { _summaryThread_m.unlock(); break; } _summaryThread_m.unlock(); Thread::sleep(50); } } JSONDB::~JSONDB() { { Mutex::Lock _l(_networks_m); _networks.clear(); } Thread t; { Mutex::Lock _l(_summaryThread_m); _summaryThreadRun = false; t = _summaryThread; } if (t) Thread::join(t); } bool JSONDB::writeRaw(const std::string &n,const std::string &obj) { if (_rawOutput >= 0) { #ifndef __WINDOWS__ if (obj.length() > 0) { Mutex::Lock _l(_rawLock); if (write(_rawOutput,obj.c_str(),obj.length() + 1) > 0) return true; } else { return true; } #endif return false; } else if (_httpAddr) { std::map headers; std::string body; std::map reqHeaders; char tmp[64]; OSUtils::ztsnprintf(tmp,sizeof(tmp),"%lu",(unsigned long)obj.length()); reqHeaders["Content-Length"] = tmp; reqHeaders["Content-Type"] = "application/json"; const unsigned int sc = Http::PUT(0,ZT_JSONDB_HTTP_TIMEOUT,reinterpret_cast(&_httpAddr),(_basePath+"/"+n).c_str(),reqHeaders,obj.data(),(unsigned long)obj.length(),headers,body); return (sc == 200); } else { const std::string path(_genPath(n,true)); if (!path.length()) return false; return OSUtils::writeFile(path.c_str(),obj); } } bool JSONDB::hasNetwork(const uint64_t networkId) const { Mutex::Lock _l(_networks_m); return (_networks.find(networkId) != _networks.end()); } bool JSONDB::getNetwork(const uint64_t networkId,nlohmann::json &config) const { Mutex::Lock _l(_networks_m); const std::unordered_map::const_iterator i(_networks.find(networkId)); if (i == _networks.end()) return false; config = nlohmann::json::from_msgpack(i->second.config); return true; } bool JSONDB::getNetworkSummaryInfo(const uint64_t networkId,NetworkSummaryInfo &ns) const { Mutex::Lock _l(_networks_m); const std::unordered_map::const_iterator i(_networks.find(networkId)); if (i == _networks.end()) return false; ns = i->second.summaryInfo; return true; } int JSONDB::getNetworkAndMember(const uint64_t networkId,const uint64_t nodeId,nlohmann::json &networkConfig,nlohmann::json &memberConfig,NetworkSummaryInfo &ns) const { Mutex::Lock _l(_networks_m); const std::unordered_map::const_iterator i(_networks.find(networkId)); if (i == _networks.end()) return 0; const std::unordered_map< uint64_t,std::vector >::const_iterator j(i->second.members.find(nodeId)); if (j == i->second.members.end()) return 1; networkConfig = nlohmann::json::from_msgpack(i->second.config); memberConfig = nlohmann::json::from_msgpack(j->second); ns = i->second.summaryInfo; return 3; } bool JSONDB::getNetworkMember(const uint64_t networkId,const uint64_t nodeId,nlohmann::json &memberConfig) const { Mutex::Lock _l(_networks_m); const std::unordered_map::const_iterator i(_networks.find(networkId)); if (i == _networks.end()) return false; const std::unordered_map< uint64_t,std::vector >::const_iterator j(i->second.members.find(nodeId)); if (j == i->second.members.end()) return false; memberConfig = nlohmann::json::from_msgpack(j->second); return true; } void JSONDB::saveNetwork(const uint64_t networkId,const nlohmann::json &networkConfig) { char n[64]; OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx",(unsigned long long)networkId); writeRaw(n,OSUtils::jsonDump(networkConfig)); { Mutex::Lock _l(_networks_m); _networks[networkId].config = nlohmann::json::to_msgpack(networkConfig); } _recomputeSummaryInfo(networkId); } void JSONDB::saveNetworkMember(const uint64_t networkId,const uint64_t nodeId,const nlohmann::json &memberConfig) { char n[256]; OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx/member/%.10llx",(unsigned long long)networkId,(unsigned long long)nodeId); writeRaw(n,OSUtils::jsonDump(memberConfig)); { Mutex::Lock _l(_networks_m); _networks[networkId].members[nodeId] = nlohmann::json::to_msgpack(memberConfig); _members[nodeId].insert(networkId); } _recomputeSummaryInfo(networkId); } nlohmann::json JSONDB::eraseNetwork(const uint64_t networkId) { if (!_httpAddr) { // Member deletion is done by Central in harnessed mode, and deleting the cache network entry also deletes all members std::vector memberIds; { Mutex::Lock _l(_networks_m); const std::unordered_map::iterator i(_networks.find(networkId)); if (i == _networks.end()) return _EMPTY_JSON; for(std::unordered_map< uint64_t,std::vector >::iterator m(i->second.members.begin());m!=i->second.members.end();++m) memberIds.push_back(m->first); } for(std::vector::iterator m(memberIds.begin());m!=memberIds.end();++m) eraseNetworkMember(networkId,*m,false); } char n[256]; OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx",(unsigned long long)networkId); if (_rawOutput >= 0) { // In harnessed mode, deletes occur in Central or other management // software and do not need to be executed this way. } else if (_httpAddr) { std::map headers; std::string body; Http::DEL(0,ZT_JSONDB_HTTP_TIMEOUT,reinterpret_cast(&_httpAddr),(_basePath+"/"+n).c_str(),_ZT_JSONDB_GET_HEADERS,headers,body); } else { const std::string path(_genPath(n,false)); if (path.length()) OSUtils::rm(path.c_str()); } { Mutex::Lock _l(_networks_m); std::unordered_map::iterator i(_networks.find(networkId)); if (i == _networks.end()) return _EMPTY_JSON; // sanity check, shouldn't happen nlohmann::json tmp(nlohmann::json::from_msgpack(i->second.config)); _networks.erase(i); return tmp; } } nlohmann::json JSONDB::eraseNetworkMember(const uint64_t networkId,const uint64_t nodeId,bool recomputeSummaryInfo) { char n[256]; OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx/member/%.10llx",(unsigned long long)networkId,(unsigned long long)nodeId); if (_rawOutput >= 0) { // In harnessed mode, deletes occur in Central or other management // software and do not need to be executed this way. } else if (_httpAddr) { std::map headers; std::string body; Http::DEL(0,ZT_JSONDB_HTTP_TIMEOUT,reinterpret_cast(&_httpAddr),(_basePath+"/"+n).c_str(),_ZT_JSONDB_GET_HEADERS,headers,body); } else { const std::string path(_genPath(n,false)); if (path.length()) OSUtils::rm(path.c_str()); } { Mutex::Lock _l(_networks_m); _members[nodeId].erase(networkId); std::unordered_map::iterator i(_networks.find(networkId)); if (i == _networks.end()) return _EMPTY_JSON; std::unordered_map< uint64_t,std::vector >::iterator j(i->second.members.find(nodeId)); if (j == i->second.members.end()) return _EMPTY_JSON; nlohmann::json tmp(j->second); i->second.members.erase(j); if (recomputeSummaryInfo) _recomputeSummaryInfo(networkId); return tmp; } } void JSONDB::threadMain() throw() { #ifndef __WINDOWS__ fd_set readfds,nullfds; char *const readbuf = (_rawInput >= 0) ? (new char[1048576]) : (char *)0; std::string rawInputBuf; FD_ZERO(&readfds); FD_ZERO(&nullfds); #endif std::vector todo; while (_summaryThreadRun) { #ifndef __WINDOWS__ if (_rawInput < 0) { Thread::sleep(25); } else { FD_SET(_rawInput,&readfds); struct timeval tv; tv.tv_sec = 0; tv.tv_usec = 25000; select(_rawInput+1,&readfds,&nullfds,&nullfds,&tv); if (FD_ISSET(_rawInput,&readfds)) { const long rn = (long)read(_rawInput,readbuf,1048576); for(long i=0;i 0) { _add(OSUtils::jsonParse(rawInputBuf)); rawInputBuf.clear(); } } continue; // we only want to do the stuff below this every few dozen ms or so, so pause again } } #else Thread::sleep(25); #endif { Mutex::Lock _l(_summaryThread_m); if (_summaryThreadToDo.empty()) continue; else _summaryThreadToDo.swap(todo); } const uint64_t now = OSUtils::now(); for(std::vector::iterator ii(todo.begin());ii!=todo.end();++ii) { const uint64_t networkId = *ii; Mutex::Lock _l(_networks_m); std::unordered_map::iterator n(_networks.find(networkId)); if (n != _networks.end()) { NetworkSummaryInfo &ns = n->second.summaryInfo; ns.activeBridges.clear(); ns.allocatedIps.clear(); ns.authorizedMemberCount = 0; ns.activeMemberCount = 0; ns.totalMemberCount = 0; ns.mostRecentDeauthTime = 0; for(std::unordered_map< uint64_t,std::vector >::const_iterator m(n->second.members.begin());m!=n->second.members.end();++m) { try { nlohmann::json member(nlohmann::json::from_msgpack(m->second)); if (OSUtils::jsonBool(member["authorized"],false)) { ++ns.authorizedMemberCount; try { const nlohmann::json &mlog = member["recentLog"]; if ((mlog.is_array())&&(mlog.size() > 0)) { const nlohmann::json &mlog1 = mlog[0]; if (mlog1.is_object()) { if ((now - OSUtils::jsonInt(mlog1["ts"],0ULL)) < (ZT_NETWORK_AUTOCONF_DELAY * 2)) ++ns.activeMemberCount; } } } catch ( ... ) {} try { if (OSUtils::jsonBool(member["activeBridge"],false)) ns.activeBridges.push_back(Address(m->first)); } catch ( ... ) {} try { const nlohmann::json &mips = member["ipAssignments"]; if (mips.is_array()) { for(unsigned long i=0;isecond.summaryInfoLastComputed = now; } } todo.clear(); } #ifndef __WINDOWS__ delete [] readbuf; #endif } bool JSONDB::_add(const nlohmann::json &j) { try { if (j.is_object()) { std::string id(OSUtils::jsonString(j["id"],"0")); std::string objtype(OSUtils::jsonString(j["objtype"],"")); if ((id.length() == 16)&&(objtype == "network")) { const uint64_t nwid = Utils::hexStrToU64(id.c_str()); if (nwid) { Mutex::Lock _l(_networks_m); _networks[nwid].config = nlohmann::json::to_msgpack(j); return true; } } else if ((id.length() == 10)&&(objtype == "member")) { const uint64_t mid = Utils::hexStrToU64(id.c_str()); const uint64_t nwid = Utils::hexStrToU64(OSUtils::jsonString(j["nwid"],"0").c_str()); if ((mid)&&(nwid)) { Mutex::Lock _l(_networks_m); _networks[nwid].members[mid] = nlohmann::json::to_msgpack(j); _members[mid].insert(nwid); return true; } } } } catch ( ... ) {} return false; } bool JSONDB::_load(const std::string &p) { // This is not used in stdin/stdout mode. Instead data is populated by // sending it all to stdin. if (_httpAddr) { // In HTTP harnessed mode we download our entire working data set on startup. std::string body; std::map headers; const unsigned int sc = Http::GET(0,ZT_JSONDB_HTTP_TIMEOUT,reinterpret_cast(&_httpAddr),_basePath.c_str(),_ZT_JSONDB_GET_HEADERS,headers,body); if (sc == 200) { try { nlohmann::json dbImg(OSUtils::jsonParse(body)); std::string tmp; if (dbImg.is_object()) { Mutex::Lock _l(_networks_m); for(nlohmann::json::iterator i(dbImg.begin());i!=dbImg.end();++i) { try { _add(i.value()); } catch ( ... ) {} } return true; } } catch ( ... ) {} // invalid JSON, so maybe incomplete request } return false; } else { // In regular mode we recursively read it from controller.d/ on disk std::vector dl(OSUtils::listDirectory(p.c_str(),true)); for(std::vector::const_iterator di(dl.begin());di!=dl.end();++di) { if ((di->length() > 5)&&(di->substr(di->length() - 5) == ".json")) { std::string buf; if (OSUtils::readFile((p + ZT_PATH_SEPARATOR_S + *di).c_str(),buf)) { try { _add(OSUtils::jsonParse(buf)); } catch ( ... ) {} } } else { this->_load((p + ZT_PATH_SEPARATOR_S + *di)); } } return true; } } void JSONDB::_recomputeSummaryInfo(const uint64_t networkId) { Mutex::Lock _l(_summaryThread_m); if (std::find(_summaryThreadToDo.begin(),_summaryThreadToDo.end(),networkId) == _summaryThreadToDo.end()) _summaryThreadToDo.push_back(networkId); if (!_summaryThread) _summaryThread = Thread::start(this); } std::string JSONDB::_genPath(const std::string &n,bool create) { std::vector pt(OSUtils::split(n.c_str(),"/","","")); if (pt.size() == 0) return std::string(); char sep; if (_httpAddr) { sep = '/'; create = false; } else { sep = ZT_PATH_SEPARATOR; } std::string p(_basePath); if (create) OSUtils::mkdir(p.c_str()); for(unsigned long i=0,j=(unsigned long)(pt.size()-1);i