diff --git a/controller/JSONDB.cpp b/controller/JSONDB.cpp index 0c0612668..7f92d6ee0 100644 --- a/controller/JSONDB.cpp +++ b/controller/JSONDB.cpp @@ -16,6 +16,18 @@ * along with this program. If not, see . */ +#include +#include +#include +#ifndef __WINDOWS__ +#include +#include +#include +#include +#include +#include +#endif + #include "JSONDB.hpp" #define ZT_JSONDB_HTTP_TIMEOUT 60000 @@ -27,9 +39,12 @@ static const std::map _ZT_JSONDB_GET_HEADERS; JSONDB::JSONDB(const std::string &basePath) : _basePath(basePath), + _rawInput(-1), + _rawOutput(-1), _summaryThreadRun(true) { if ((_basePath.length() > 7)&&(_basePath.substr(0,7) == "http://")) { + // If base path is http:// we run in HTTP mode // TODO: this doesn't yet support IPv6 since bracketed address notiation isn't supported. // Typically it's just used with 127.0.0.1 anyway. std::string hn = _basePath.substr(7); @@ -46,16 +61,27 @@ JSONDB::JSONDB(const std::string &basePath) : _basePath = "/"; if (_basePath[0] != '/') _basePath = std::string("/") + _basePath; +#ifndef __WINDOWS__ + } else if (_basePath == "-") { + // If base path is "-" we run in stdin/stdout mode and expect our database to be populated on startup via stdin + // Not supported on Windows + _rawInput = STDIN_FILENO; + _rawOutput = STDOUT_FILENO; + fcntl(_rawInput,F_SETFL,O_NONBLOCK); +#endif } else { + // Default mode of operation is to store files in the filesystem OSUtils::mkdir(_basePath.c_str()); OSUtils::lockDownFile(_basePath.c_str(),true); // networks might contain auth tokens, etc., so restrict directory permissions } - unsigned int cnt = 0; - while (!_load(_basePath)) { - if ((++cnt & 7) == 0) - fprintf(stderr,"WARNING: controller still waiting to read '%s'..." ZT_EOL_S,_basePath.c_str()); - Thread::sleep(250); + if (_rawInput < 0) { + unsigned int cnt = 0; + while (!_load(_basePath)) { + if ((++cnt & 7) == 0) + fprintf(stderr,"WARNING: controller still waiting to read '%s'..." ZT_EOL_S,_basePath.c_str()); + Thread::sleep(250); + } } for(std::unordered_map::iterator n(_networks.begin());n!=_networks.end();++n) @@ -89,7 +115,18 @@ JSONDB::~JSONDB() bool JSONDB::writeRaw(const std::string &n,const std::string &obj) { - if (_httpAddr) { + if (_rawOutput >= 0) { +#ifndef __WINDOWS__ + if (obj.length() > 0) { + Mutex::Lock _l(_rawLock); + if (write(_rawOutput,obj.c_str(),obj.length() + 1) > 0) + return true; + } else { + return true; + } +#endif + return false; + } else if (_httpAddr) { std::map headers; std::string body; std::map reqHeaders; @@ -205,11 +242,13 @@ nlohmann::json JSONDB::eraseNetwork(const uint64_t networkId) char n[256]; OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx",(unsigned long long)networkId); - if (_httpAddr) { - // Deletion is currently done by Central in harnessed mode - //std::map headers; - //std::string body; - //Http::DEL(0,ZT_JSONDB_HTTP_TIMEOUT,reinterpret_cast(&_httpAddr),(_basePath+"/"+n).c_str(),_ZT_JSONDB_GET_HEADERS,headers,body); + if (_rawOutput >= 0) { + // In harnessed mode, deletes occur in Central or other management + // software and do not need to be executed this way. + } else if (_httpAddr) { + std::map headers; + std::string body; + Http::DEL(0,ZT_JSONDB_HTTP_TIMEOUT,reinterpret_cast(&_httpAddr),(_basePath+"/"+n).c_str(),_ZT_JSONDB_GET_HEADERS,headers,body); } else { const std::string path(_genPath(n,false)); if (path.length()) @@ -232,11 +271,13 @@ nlohmann::json JSONDB::eraseNetworkMember(const uint64_t networkId,const uint64_ char n[256]; OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx/member/%.10llx",(unsigned long long)networkId,(unsigned long long)nodeId); - if (_httpAddr) { - // Deletion is currently done by the caller in Central harnessed mode - //std::map headers; - //std::string body; - //Http::DEL(0,ZT_JSONDB_HTTP_TIMEOUT,reinterpret_cast(&_httpAddr),(_basePath+"/"+n).c_str(),_ZT_JSONDB_GET_HEADERS,headers,body); + if (_rawOutput >= 0) { + // In harnessed mode, deletes occur in Central or other management + // software and do not need to be executed this way. + } else if (_httpAddr) { + std::map headers; + std::string body; + Http::DEL(0,ZT_JSONDB_HTTP_TIMEOUT,reinterpret_cast(&_httpAddr),(_basePath+"/"+n).c_str(),_ZT_JSONDB_GET_HEADERS,headers,body); } else { const std::string path(_genPath(n,false)); if (path.length()) @@ -263,9 +304,41 @@ nlohmann::json JSONDB::eraseNetworkMember(const uint64_t networkId,const uint64_ void JSONDB::threadMain() throw() { +#ifndef __WINDOWS__ + fd_set readfds,nullfds; + char *const readbuf = (_rawInput >= 0) ? (new char[1048576]) : (char *)0; + std::string rawInputBuf; + FD_ZERO(&readfds); + FD_ZERO(&nullfds); +#endif + std::vector todo; + while (_summaryThreadRun) { - Thread::sleep(10); +#ifndef __WINDOWS__ + if (_rawInput < 0) { + Thread::sleep(25); + } else { + FD_SET(_rawInput,&readfds); + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 25000; + select(_rawInput+1,&readfds,&nullfds,&nullfds,&tv); + if (FD_ISSET(_rawInput,&readfds)) { + const long rn = (long)read(_rawInput,readbuf,1048576); + for(long i=0;i 0) { + _add(OSUtils::jsonParse(rawInputBuf)); + rawInputBuf.clear(); + } + } + } + } +#else + Thread::sleep(25); +#endif { Mutex::Lock _l(_summaryThread_m); @@ -273,7 +346,6 @@ void JSONDB::threadMain() continue; else _summaryThreadToDo.swap(todo); } - const uint64_t now = OSUtils::now(); for(std::vector::iterator ii(todo.begin());ii!=todo.end();++ii) { const uint64_t networkId = *ii; @@ -340,10 +412,46 @@ void JSONDB::threadMain() todo.clear(); } + +#ifndef __WINDOWS__ + delete [] readbuf; +#endif +} + +bool JSONDB::_add(const nlohmann::json &j) +{ + try { + if (j.is_object()) { + std::string id(OSUtils::jsonString(j["id"],"0")); + std::string objtype(OSUtils::jsonString(j["objtype"],"")); + + if ((id.length() == 16)&&(objtype == "network")) { + const uint64_t nwid = Utils::hexStrToU64(id.c_str()); + if (nwid) { + Mutex::Lock _l(_networks_m); + _networks[nwid].config = nlohmann::json::to_msgpack(j); + return true; + } + } else if ((id.length() == 10)&&(objtype == "member")) { + const uint64_t mid = Utils::hexStrToU64(id.c_str()); + const uint64_t nwid = Utils::hexStrToU64(OSUtils::jsonString(j["nwid"],"0").c_str()); + if ((mid)&&(nwid)) { + Mutex::Lock _l(_networks_m); + _networks[nwid].members[mid] = nlohmann::json::to_msgpack(j); + _members[mid].insert(nwid); + return true; + } + } + } + } catch ( ... ) {} + return false; } bool JSONDB::_load(const std::string &p) { + // This is not used in stdin/stdout mode. Instead data is populated by + // sending it all to stdin. + if (_httpAddr) { // In HTTP harnessed mode we download our entire working data set on startup. @@ -357,24 +465,9 @@ bool JSONDB::_load(const std::string &p) if (dbImg.is_object()) { Mutex::Lock _l(_networks_m); for(nlohmann::json::iterator i(dbImg.begin());i!=dbImg.end();++i) { - nlohmann::json &j = i.value(); - if (j.is_object()) { - std::string id(OSUtils::jsonString(j["id"],"0")); - std::string objtype(OSUtils::jsonString(j["objtype"],"")); - - if ((id.length() == 16)&&(objtype == "network")) { - const uint64_t nwid = Utils::hexStrToU64(id.c_str()); - if (nwid) - _networks[nwid].config = nlohmann::json::to_msgpack(j); - } else if ((id.length() == 10)&&(objtype == "member")) { - const uint64_t mid = Utils::hexStrToU64(id.c_str()); - const uint64_t nwid = Utils::hexStrToU64(OSUtils::jsonString(j["nwid"],"0").c_str()); - if ((mid)&&(nwid)) { - _networks[nwid].members[mid] = nlohmann::json::to_msgpack(j); - _members[mid].insert(nwid); - } - } - } + try { + _add(i.value()); + } catch ( ... ) {} } return true; } @@ -391,25 +484,7 @@ bool JSONDB::_load(const std::string &p) std::string buf; if (OSUtils::readFile((p + ZT_PATH_SEPARATOR_S + *di).c_str(),buf)) { try { - nlohmann::json j(OSUtils::jsonParse(buf)); - std::string id(OSUtils::jsonString(j["id"],"0")); - std::string objtype(OSUtils::jsonString(j["objtype"],"")); - - if ((id.length() == 16)&&(objtype == "network")) { - const uint64_t nwid = Utils::hexStrToU64(id.c_str()); - if (nwid) { - Mutex::Lock _l(_networks_m); - _networks[nwid].config = nlohmann::json::to_msgpack(j); - } - } else if ((id.length() == 10)&&(objtype == "member")) { - const uint64_t mid = Utils::hexStrToU64(id.c_str()); - const uint64_t nwid = Utils::hexStrToU64(OSUtils::jsonString(j["nwid"],"0").c_str()); - if ((mid)&&(nwid)) { - Mutex::Lock _l(_networks_m); - _networks[nwid].members[mid] = nlohmann::json::to_msgpack(j); - _members[mid].insert(nwid); - } - } + _add(OSUtils::jsonParse(buf)); } catch ( ... ) {} } } else { diff --git a/controller/JSONDB.hpp b/controller/JSONDB.hpp index 99b69ba23..23d00a512 100644 --- a/controller/JSONDB.hpp +++ b/controller/JSONDB.hpp @@ -145,12 +145,15 @@ public: throw(); private: + bool _add(const nlohmann::json &j); bool _load(const std::string &p); void _recomputeSummaryInfo(const uint64_t networkId); std::string _genPath(const std::string &n,bool create); std::string _basePath; InetAddress _httpAddr; + int _rawInput,_rawOutput; + Mutex _rawLock; Thread _summaryThread; std::vector _summaryThreadToDo; diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp index 51955bf3f..e5e104761 100644 --- a/node/IncomingPacket.cpp +++ b/node/IncomingPacket.cpp @@ -81,14 +81,12 @@ bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR,void *tPtr) if (peer) { if (!trusted) { if (!dearmor(peer->key())) { - //fprintf(stderr,"dropped packet from %s(%s), MAC authentication failed (size: %u)" ZT_EOL_S,sourceAddress.toString().c_str(),_path->address().toString().c_str(),size()); RR->t->incomingPacketMessageAuthenticationFailure(tPtr,_path,packetId(),sourceAddress,hops()); return true; } } if (!uncompress()) { - //fprintf(stderr,"dropped packet from %s(%s), compressed data invalid (size %u, verb may be %u)" ZT_EOL_S,sourceAddress.toString().c_str(),_path->address().toString().c_str(),size(),(unsigned int)verb()); RR->t->incomingPacketInvalid(tPtr,_path,packetId(),sourceAddress,hops(),Packet::VERB_NOP,"LZ4 decompression failed"); return true; } diff --git a/service/SoftwareUpdater.cpp b/service/SoftwareUpdater.cpp index 57ecce78c..110059456 100644 --- a/service/SoftwareUpdater.cpp +++ b/service/SoftwareUpdater.cpp @@ -243,7 +243,6 @@ void SoftwareUpdater::handleSoftwareUpdateUserMessage(uint64_t origin,const void gd.append(_downloadHashPrefix.data,16); gd.append((uint32_t)_download.length()); _node.sendUserMessage((void *)0,ZT_SOFTWARE_UPDATE_SERVICE,ZT_SOFTWARE_UPDATE_USER_MESSAGE_TYPE,gd.data(),gd.size()); - //printf(">> GET_DATA @%u\n",(unsigned int)_download.length()); } } } @@ -258,7 +257,6 @@ void SoftwareUpdater::handleSoftwareUpdateUserMessage(uint64_t origin,const void idx |= (unsigned long)*(reinterpret_cast(data) + 18) << 16; idx |= (unsigned long)*(reinterpret_cast(data) + 19) << 8; idx |= (unsigned long)*(reinterpret_cast(data) + 20); - //printf("<< GET_DATA @%u from %.10llx for %s\n",(unsigned int)idx,origin,Utils::hex(reinterpret_cast(data) + 1,16).c_str()); std::map< Array,_D >::iterator d(_dist.find(Array(reinterpret_cast(data) + 1))); if ((d != _dist.end())&&(idx < (unsigned long)d->second.bin.length())) { Buffer buf; @@ -267,7 +265,6 @@ void SoftwareUpdater::handleSoftwareUpdateUserMessage(uint64_t origin,const void buf.append((uint32_t)idx); buf.append(d->second.bin.data() + idx,std::min((unsigned long)ZT_SOFTWARE_UPDATE_CHUNK_SIZE,(unsigned long)(d->second.bin.length() - idx))); _node.sendUserMessage((void *)0,origin,ZT_SOFTWARE_UPDATE_USER_MESSAGE_TYPE,buf.data(),buf.size()); - //printf(">> DATA @%u\n",(unsigned int)idx); } } break; @@ -278,7 +275,6 @@ void SoftwareUpdater::handleSoftwareUpdateUserMessage(uint64_t origin,const void idx |= (unsigned long)*(reinterpret_cast(data) + 18) << 16; idx |= (unsigned long)*(reinterpret_cast(data) + 19) << 8; idx |= (unsigned long)*(reinterpret_cast(data) + 20); - //printf("<< DATA @%u / %u bytes (we now have %u bytes)\n",(unsigned int)idx,(unsigned int)(len - 21),(unsigned int)_download.length()); if (idx == (unsigned long)_download.length()) { _download.append(reinterpret_cast(data) + 21,len - 21); if (_download.length() < _downloadLength) { @@ -287,7 +283,6 @@ void SoftwareUpdater::handleSoftwareUpdateUserMessage(uint64_t origin,const void gd.append(_downloadHashPrefix.data,16); gd.append((uint32_t)_download.length()); _node.sendUserMessage((void *)0,ZT_SOFTWARE_UPDATE_SERVICE,ZT_SOFTWARE_UPDATE_USER_MESSAGE_TYPE,gd.data(),gd.size()); - //printf(">> GET_DATA @%u\n",(unsigned int)_download.length()); } } } @@ -334,7 +329,6 @@ bool SoftwareUpdater::check(const uint64_t now) (int)ZT_VENDOR_ZEROTIER, _channel.c_str()); _node.sendUserMessage((void *)0,ZT_SOFTWARE_UPDATE_SERVICE,ZT_SOFTWARE_UPDATE_USER_MESSAGE_TYPE,tmp,len); - //printf(">> GET_LATEST\n"); } if (_latestValid) @@ -360,7 +354,6 @@ bool SoftwareUpdater::check(const uint64_t now) if (OSUtils::writeFile(binPath.c_str(),_download)) { OSUtils::lockDownFile(binPath.c_str(),false); _latestValid = true; - //printf("VALID UPDATE\n%s\n",OSUtils::jsonDump(_latestMeta).c_str()); _download = std::string(); _downloadLength = 0; return true; @@ -370,7 +363,6 @@ bool SoftwareUpdater::check(const uint64_t now) } catch ( ... ) {} // any exception equals verification failure // If we get here, checks failed. - //printf("INVALID UPDATE (!!!)\n%s\n",OSUtils::jsonDump(_latestMeta).c_str()); OSUtils::rm(binPath.c_str()); _latestMeta = nlohmann::json(); _latestValid = false; @@ -382,7 +374,6 @@ bool SoftwareUpdater::check(const uint64_t now) gd.append(_downloadHashPrefix.data,16); gd.append((uint32_t)_download.length()); _node.sendUserMessage((void *)0,ZT_SOFTWARE_UPDATE_SERVICE,ZT_SOFTWARE_UPDATE_USER_MESSAGE_TYPE,gd.data(),gd.size()); - //printf(">> GET_DATA @%u\n",(unsigned int)_download.length()); } }