From a577b8d3816069a448a946302048a377b55cd74a Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Wed, 1 Mar 2017 16:33:34 -0800 Subject: [PATCH] Update how controller handles circuit tests -- save results to filesystem. --- controller/EmbeddedNetworkController.cpp | 137 +++++++++++------------ controller/EmbeddedNetworkController.hpp | 15 +-- controller/JSONDB.cpp | 25 +++-- controller/JSONDB.hpp | 6 +- controller/README.md | 11 +- node/Peer.cpp | 6 +- service/OneService.cpp | 2 +- 7 files changed, 100 insertions(+), 102 deletions(-) diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp index 0e57fc14d..7915765b4 100644 --- a/controller/EmbeddedNetworkController.cpp +++ b/controller/EmbeddedNetworkController.cpp @@ -428,9 +428,9 @@ static bool _parseRule(json &r,ZT_VirtualNetworkRule &rule) return false; } -EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath,FILE *feed) : +EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath) : _threadsStarted(false), - _db(dbPath,feed), + _db(dbPath), _node(node) { OSUtils::mkdir(dbPath); @@ -546,21 +546,6 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET( return 200; } - } else if ((path[2] == "test")&&(path.size() >= 4)) { - - Mutex::Lock _l(_circuitTests_m); - std::map< uint64_t,_CircuitTestEntry >::iterator cte(_circuitTests.find(Utils::hexStrToU64(path[3].c_str()))); - if ((cte != _circuitTests.end())&&(cte->second.test)) { - - responseBody = "["; - responseBody.append(cte->second.jsonResults); - responseBody.push_back(']'); - responseContentType = "application/json"; - - return 200; - - } // else 404 - } // else 404 } else { @@ -755,9 +740,10 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST( return 200; } else if ((path.size() == 3)&&(path[2] == "test")) { - Mutex::Lock _l(_circuitTests_m); + Mutex::Lock _l(_tests_m); - ZT_CircuitTest *test = (ZT_CircuitTest *)malloc(sizeof(ZT_CircuitTest)); + _tests.push_back(ZT_CircuitTest()); + ZT_CircuitTest *const test = &(_tests.back()); memset(test,0,sizeof(ZT_CircuitTest)); Utils::getSecureRandom(&(test->testId),sizeof(test->testId)); @@ -781,7 +767,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST( test->reportAtEveryHop = (OSUtils::jsonBool(b["reportAtEveryHop"],true) ? 1 : 0); if (!test->hopCount) { - ::free((void *)test); + _tests.pop_back(); responseBody = "{ \"message\": \"a test must contain at least one hop\" }"; responseContentType = "application/json"; return 400; @@ -789,18 +775,18 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST( test->timestamp = OSUtils::now(); - _CircuitTestEntry &te = _circuitTests[test->testId]; - te.test = test; - te.jsonResults = ""; - - if (_node) + if (_node) { _node->circuitTestBegin(test,&(EmbeddedNetworkController::_circuitTestCallback)); - else return 500; + } else { + _tests.pop_back(); + return 500; + } char json[1024]; Utils::snprintf(json,sizeof(json),"{\"testId\":\"%.16llx\"}",test->testId); responseBody = json; responseContentType = "application/json"; + return 200; } // else 404 @@ -1137,62 +1123,67 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE( void EmbeddedNetworkController::threadMain() throw() { + uint64_t lastCircuitTestCheck = 0; for(;;) { - _RQEntry *const qe = _queue.get(); + _RQEntry *const qe = _queue.get(); // waits on next request if (!qe) break; // enqueue a NULL to terminate threads try { _request(qe->nwid,qe->fromAddr,qe->requestPacketId,qe->identity,qe->metaData); } catch ( ... ) {} delete qe; + + uint64_t now = OSUtils::now(); + if ((now - lastCircuitTestCheck) > ZT_EMBEDDEDNETWORKCONTROLLER_CIRCUIT_TEST_EXPIRATION) { + lastCircuitTestCheck = now; + Mutex::Lock _l(_tests_m); + for(std::list< ZT_CircuitTest >::iterator i(_tests.begin());i!=_tests.end();) { + if ((now - i->timestamp) > ZT_EMBEDDEDNETWORKCONTROLLER_CIRCUIT_TEST_EXPIRATION) { + _node->circuitTestEnd(&(*i)); + _tests.erase(i++); + } else ++i; + } + } } } void EmbeddedNetworkController::_circuitTestCallback(ZT_Node *node,ZT_CircuitTest *test,const ZT_CircuitTestReport *report) { - char tmp[65535]; + char tmp[1024],id[128]; EmbeddedNetworkController *const self = reinterpret_cast(test->ptr); - if (!test) - return; - if (!report) - return; - - Mutex::Lock _l(self->_circuitTests_m); - std::map< uint64_t,_CircuitTestEntry >::iterator cte(self->_circuitTests.find(test->testId)); - - if (cte == self->_circuitTests.end()) { // sanity check: a circuit test we didn't launch? - self->_node->circuitTestEnd(test); - ::free((void *)test); - return; - } + if ((!test)||(!report)||(!test->credentialNetworkId)) return; // sanity check + const uint64_t now = OSUtils::now(); + Utils::snprintf(id,sizeof(id),"network/%.16llx/test/%.16llx-%.16llx-%.10llx-%.10llx",test->credentialNetworkId,test->testId,now,report->upstream,report->current); Utils::snprintf(tmp,sizeof(tmp), - "%s{\n" - "\t\"timestamp\": %llu," ZT_EOL_S - "\t\"testId\": \"%.16llx\"," ZT_EOL_S - "\t\"upstream\": \"%.10llx\"," ZT_EOL_S - "\t\"current\": \"%.10llx\"," ZT_EOL_S - "\t\"receivedTimestamp\": %llu," ZT_EOL_S - "\t\"sourcePacketId\": \"%.16llx\"," ZT_EOL_S - "\t\"flags\": %llu," ZT_EOL_S - "\t\"sourcePacketHopCount\": %u," ZT_EOL_S - "\t\"errorCode\": %u," ZT_EOL_S - "\t\"vendor\": %d," ZT_EOL_S - "\t\"protocolVersion\": %u," ZT_EOL_S - "\t\"majorVersion\": %u," ZT_EOL_S - "\t\"minorVersion\": %u," ZT_EOL_S - "\t\"revision\": %u," ZT_EOL_S - "\t\"platform\": %d," ZT_EOL_S - "\t\"architecture\": %d," ZT_EOL_S - "\t\"receivedOnLocalAddress\": \"%s\"," ZT_EOL_S - "\t\"receivedFromRemoteAddress\": \"%s\"" ZT_EOL_S - "}", - ((cte->second.jsonResults.length() > 0) ? ",\n" : ""), - (unsigned long long)report->timestamp, + "{\"id\": \"%s\"," + "\"timestamp\": %llu," + "\"networkId\": \"%.16llx\"," + "\"testId\": \"%.16llx\"," + "\"upstream\": \"%.10llx\"," + "\"current\": \"%.10llx\"," + "\"receivedTimestamp\": %llu," + "\"sourcePacketId\": \"%.16llx\"," + "\"flags\": %llu," + "\"sourcePacketHopCount\": %u," + "\"errorCode\": %u," + "\"vendor\": %d," + "\"protocolVersion\": %u," + "\"majorVersion\": %u," + "\"minorVersion\": %u," + "\"revision\": %u," + "\"platform\": %d," + "\"architecture\": %d," + "\"receivedOnLocalAddress\": \"%s\"," + "\"receivedFromRemoteAddress\": \"%s\"," + "\"receivedFromLinkQuality\": %f}", + id + 30, // last bit only, not leading path + (unsigned long long)test->timestamp, + (unsigned long long)test->credentialNetworkId, (unsigned long long)test->testId, (unsigned long long)report->upstream, (unsigned long long)report->current, - (unsigned long long)OSUtils::now(), + (unsigned long long)now, (unsigned long long)report->sourcePacketId, (unsigned long long)report->flags, report->sourcePacketHopCount, @@ -1205,9 +1196,11 @@ void EmbeddedNetworkController::_circuitTestCallback(ZT_Node *node,ZT_CircuitTes (int)report->platform, (int)report->architecture, reinterpret_cast(&(report->receivedOnLocalAddress))->toString().c_str(), - reinterpret_cast(&(report->receivedFromRemoteAddress))->toString().c_str()); + reinterpret_cast(&(report->receivedFromRemoteAddress))->toString().c_str(), + ((double)report->receivedFromLinkQuality / (double)ZT_PATH_LINK_QUALITY_MAX)); - cte->second.jsonResults.append(tmp); + Mutex::Lock _l(self->_db_m); + self->_db.writeRaw(id,std::string(tmp)); } void EmbeddedNetworkController::_request( @@ -1354,12 +1347,12 @@ void EmbeddedNetworkController::_request( if (requestPacketId) { // only log if this is a request, not for generated pushes json rlEntry = json::object(); rlEntry["ts"] = now; - rlEntry["authorized"] = (authorizedBy) ? true : false; - rlEntry["authorizedBy"] = (authorizedBy) ? authorizedBy : ""; - rlEntry["clientMajorVersion"] = metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_NODE_MAJOR_VERSION,0); - rlEntry["clientMinorVersion"] = metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_NODE_MINOR_VERSION,0); - rlEntry["clientRevision"] = metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_NODE_REVISION,0); - rlEntry["clientProtocolVersion"] = metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_PROTOCOL_VERSION,0); + rlEntry["auth"] = (authorizedBy) ? true : false; + rlEntry["authBy"] = (authorizedBy) ? authorizedBy : ""; + rlEntry["vMajor"] = metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_NODE_MAJOR_VERSION,0); + rlEntry["vMinor"] = metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_NODE_MINOR_VERSION,0); + rlEntry["vRev"] = metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_NODE_REVISION,0); + rlEntry["vProto"] = metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_PROTOCOL_VERSION,0); if (fromAddr) rlEntry["fromAddr"] = fromAddr.toString(); diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp index 3e39eaf5e..ab7cdd531 100644 --- a/controller/EmbeddedNetworkController.hpp +++ b/controller/EmbeddedNetworkController.hpp @@ -46,6 +46,9 @@ // Number of background threads to start -- not actually started until needed #define ZT_EMBEDDEDNETWORKCONTROLLER_BACKGROUND_THREAD_COUNT 2 +// TTL for circuit tests +#define ZT_EMBEDDEDNETWORKCONTROLLER_CIRCUIT_TEST_EXPIRATION 120000 + namespace ZeroTier { class Node; @@ -56,9 +59,8 @@ public: /** * @param node Parent node * @param dbPath Path to store data - * @param feed FILE to send feed of all data and changes to (zero-delimited JSON objects) or NULL for none */ - EmbeddedNetworkController(Node *node,const char *dbPath,FILE *feed); + EmbeddedNetworkController(Node *node,const char *dbPath); virtual ~EmbeddedNetworkController(); virtual void init(const Identity &signingId,Sender *sender); @@ -199,13 +201,8 @@ private: NetworkController::Sender *_sender; Identity _signingId; - struct _CircuitTestEntry - { - ZT_CircuitTest *test; - std::string jsonResults; - }; - std::map< uint64_t,_CircuitTestEntry > _circuitTests; - Mutex _circuitTests_m; + std::list< ZT_CircuitTest > _tests; + Mutex _tests_m; std::map< std::pair,uint64_t > _lastRequestTime; Mutex _lastRequestTime_m; diff --git a/controller/JSONDB.cpp b/controller/JSONDB.cpp index 044f791cf..1277aabb7 100644 --- a/controller/JSONDB.cpp +++ b/controller/JSONDB.cpp @@ -22,6 +22,22 @@ namespace ZeroTier { static const nlohmann::json _EMPTY_JSON(nlohmann::json::object()); +bool JSONDB::writeRaw(const std::string &n,const std::string &obj) +{ + if (!_isValidObjectName(n)) + return false; + + const std::string path(_genPath(n,true)); + if (!path.length()) + return false; + + const std::string buf(obj); + if (!OSUtils::writeFile(path.c_str(),buf)) + return false; + + return true; +} + bool JSONDB::put(const std::string &n,const nlohmann::json &obj) { if (!_isValidObjectName(n)) @@ -35,9 +51,6 @@ bool JSONDB::put(const std::string &n,const nlohmann::json &obj) if (!OSUtils::writeFile(path.c_str(),buf)) return false; - if (_feed) - fwrite(buf.c_str(),buf.length()+1,1,_feed); - _E &e = _db[n]; e.obj = obj; e.lastModifiedOnDisk = OSUtils::getLastModified(path.c_str()); @@ -72,9 +85,6 @@ const nlohmann::json &JSONDB::get(const std::string &n,unsigned long maxSinceChe e->second.obj = OSUtils::jsonParse(buf); e->second.lastModifiedOnDisk = lm; // don't update these if there is a parse error -- try again and again ASAP e->second.lastCheck = now; - - if (_feed) - fwrite(buf.c_str(),buf.length()+1,1,_feed); // it changed, so send to feed (also sends all objects on startup, which we want for Central) } catch ( ... ) {} // parse errors result in "holding pattern" behavior } } @@ -99,9 +109,6 @@ const nlohmann::json &JSONDB::get(const std::string &n,unsigned long maxSinceChe e2.lastModifiedOnDisk = lm; e2.lastCheck = now; - if (_feed) - fwrite(buf.c_str(),buf.length()+1,1,_feed); - return e2.obj; } } diff --git a/controller/JSONDB.hpp b/controller/JSONDB.hpp index 401136558..5b7c5e50d 100644 --- a/controller/JSONDB.hpp +++ b/controller/JSONDB.hpp @@ -42,8 +42,7 @@ namespace ZeroTier { class JSONDB { public: - JSONDB(const std::string &basePath,FILE *feed) : - _feed(feed), + JSONDB(const std::string &basePath) : _basePath(basePath) { _reload(_basePath); @@ -55,6 +54,8 @@ public: _reload(_basePath); } + bool writeRaw(const std::string &n,const std::string &obj); + bool put(const std::string &n,const nlohmann::json &obj); inline bool put(const std::string &n1,const std::string &n2,const nlohmann::json &obj) { return this->put((n1 + "/" + n2),obj); } @@ -108,7 +109,6 @@ private: inline bool operator!=(const _E &e) const { return (obj != e.obj); } }; - FILE *_feed; std::string _basePath; std::map _db; }; diff --git a/controller/README.md b/controller/README.md index 093300a69..db8d01533 100644 --- a/controller/README.md +++ b/controller/README.md @@ -237,11 +237,12 @@ Note that managed IP assignments are only used if they fall within a managed rou | Field | Type | Description | | --------------------- | ------------- | ------------------------------------------------- | | ts | integer | Time of request, ms since epoch | -| authorized | boolean | Was member authorized? | -| clientMajorVersion | integer | Client major version or -1 if unknown | -| clientMinorVersion | integer | Client minor version or -1 if unknown | -| clientRevision | integer | Client revision or -1 if unknown | -| clientProtocolVersion | integer | ZeroTier protocol version reported by client | +| auth | boolean | Was member authorized? | +| authBy | string | How was member authorized? | +| vMajor | integer | Client major version or -1 if unknown | +| vMinor | integer | Client minor version or -1 if unknown | +| vRev | integer | Client revision or -1 if unknown | +| vProto | integer | ZeroTier protocol version reported by client | | fromAddr | string | Physical address if known | The controller can only know a member's `fromAddr` if it's able to establish a direct path to it. Members behind very restrictive firewalls may not have this information since the controller will be receiving the member's requests by way of a relay. ZeroTier does not back-trace IP paths as packets are relayed since this would add a lot of protocol overhead. diff --git a/node/Peer.cpp b/node/Peer.cpp index 1dde8b65b..fa3ce6c82 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -141,10 +141,10 @@ void Peer::received( path->trustedPacketReceived(now); } - if (hops == 0) { - if (_vProto >= 9) - path->updateLinkQuality((unsigned int)(packetId & 7)); + if (_vProto >= 9) + path->updateLinkQuality((unsigned int)(packetId & 7)); + if (hops == 0) { bool pathIsConfirmed = false; { Mutex::Lock _l(_paths_m); diff --git a/service/OneService.cpp b/service/OneService.cpp index 81950e264..8d8856a26 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -638,7 +638,7 @@ public: return _termReason; } - _controller = new EmbeddedNetworkController(_node,(_homePath + ZT_PATH_SEPARATOR_S ZT_CONTROLLER_DB_PATH).c_str(),(FILE *)0); + _controller = new EmbeddedNetworkController(_node,(_homePath + ZT_PATH_SEPARATOR_S ZT_CONTROLLER_DB_PATH).c_str()); _node->setNetconfMaster((void *)_controller); #ifdef ZT_ENABLE_CLUSTER