From 31785f7f6ec27e826efc3cc2b45979e5d58f37bb Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 18 Jul 2017 15:36:33 -0700 Subject: [PATCH] Automatic periodic status dump from controller. --- controller/EmbeddedNetworkController.cpp | 72 +++++++++--------------- controller/EmbeddedNetworkController.hpp | 4 +- osdep/BlockingQueue.hpp | 28 +++++++-- 3 files changed, 52 insertions(+), 52 deletions(-) diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp index 2caec827f..cb0c05af1 100644 --- a/controller/EmbeddedNetworkController.cpp +++ b/controller/EmbeddedNetworkController.cpp @@ -429,6 +429,7 @@ static bool _parseRule(json &r,ZT_VirtualNetworkRule &rule) EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath) : _startTime(OSUtils::now()), _running(true), + _lastDumpedStatus(0), _db(dbPath), _node(node) { @@ -1010,20 +1011,6 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST( } // else 404 - } else if (path[0] == "ping") { - - _startThreads(); - _RQEntry *qe = new _RQEntry; - qe->type = _RQEntry::RQENTRY_TYPE_PING; - _queue.post(qe); - - char tmp[64]; - OSUtils::ztsnprintf(tmp,sizeof(tmp),"{\"clock\":%llu,\"ping\":%s}",(unsigned long long)now,OSUtils::jsonDump(b).c_str()); - responseBody = tmp; - responseContentType = "application/json"; - - return 200; - } return 404; @@ -1088,8 +1075,9 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE( void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt) { static volatile unsigned long idCounter = 0; - char id[128]; + std::string k,v; + try { std::vector nw4m(_db.networksForMember(rt.origin)); @@ -1103,29 +1091,19 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt) for(char *l=Utils::stok(rt.data,"\n",&saveptr);(l);l=Utils::stok((char *)0,"\n",&saveptr)) { char *eq = strchr(l,'='); if (eq > l) { - std::string k(l,(unsigned long)(eq - l)); - std::string v; + k.assign(l,(unsigned long)(eq - l)); + v.clear(); ++eq; while (*eq) { if (*eq == '\\') { ++eq; if (*eq) { switch(*eq) { - case 'r': - v.push_back('\r'); - break; - case 'n': - v.push_back('\n'); - break; - case '0': - v.push_back((char)0); - break; - case 'e': - v.push_back('='); - break; - default: - v.push_back(*eq); - break; + case 'r': v.push_back('\r'); break; + case 'n': v.push_back('\n'); break; + case '0': v.push_back((char)0); break; + case 'e': v.push_back('='); break; + default: v.push_back(*eq); break; } ++eq; } @@ -1133,7 +1111,7 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt) v.push_back(*(eq++)); } } - if (v.length() > 0) + if ((k.length() > 0)&&(v.length() > 0)) d[k] = v; } } @@ -1177,20 +1155,23 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt) void EmbeddedNetworkController::threadMain() throw() { + char tmp[256]; _RQEntry *qe = (_RQEntry *)0; - while ((_running)&&(_queue.get(qe))) { + while ((_running)&&(_queue.get(qe,1000) != BlockingQueue<_RQEntry *>::STOP)) { try { - if (qe->type == _RQEntry::RQENTRY_TYPE_REQUEST) { + if (qe->type == _RQEntry::RQENTRY_TYPE_REQUEST) _request(qe->nwid,qe->fromAddr,qe->requestPacketId,qe->identity,qe->metaData); - } else if (qe->type == _RQEntry::RQENTRY_TYPE_PING) { - const uint64_t now = OSUtils::now(); + + // Every 10s we update a 'status' containing member online state, etc. + const uint64_t now = OSUtils::now(); + if ((now - _lastDumpedStatus) >= 10000) { + _lastDumpedStatus = now; bool first = true; - std::string pong("{\"memberStatus\":{"); + std::string st("{\"id\":\"status\",\"objtype\":\"status\",\"memberStatus\":{"); { Mutex::Lock _l(_memberStatus_m); - pong.reserve(48 * (_memberStatus.size() + 1)); - _db.eachId([this,&pong,&now,&first](uint64_t networkId,uint64_t nodeId) { - char tmp[64]; + st.reserve(48 * (_memberStatus.size() + 1)); + _db.eachId([this,&st,&now,&first,&tmp](uint64_t networkId,uint64_t nodeId) { uint64_t lrt = 0ULL; auto ms = this->_memberStatus.find(_MemberStatusKey(networkId,nodeId)); if (ms != _memberStatus.end()) @@ -1200,14 +1181,13 @@ void EmbeddedNetworkController::threadMain() (unsigned long long)networkId, (unsigned long long)nodeId, (unsigned long long)lrt); - pong.append(tmp); + st.append(tmp); first = false; }); } - char tmp2[256]; - OSUtils::ztsnprintf(tmp2,sizeof(tmp2),"},\"clock\":%llu,\"startTime\":%llu}",(unsigned long long)now,(unsigned long long)_startTime); - pong.append(tmp2); - _db.writeRaw("pong",pong); + OSUtils::ztsnprintf(tmp,sizeof(tmp),"},\"clock\":%llu,\"startTime\":%llu}",(unsigned long long)now,(unsigned long long)_startTime); + st.append(tmp); + _db.writeRaw("status",st); } } catch ( ... ) {} delete qe; diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp index c2dc55b51..8752922ed 100644 --- a/controller/EmbeddedNetworkController.hpp +++ b/controller/EmbeddedNetworkController.hpp @@ -105,8 +105,7 @@ private: Identity identity; Dictionary metaData; enum { - RQENTRY_TYPE_REQUEST = 0, - RQENTRY_TYPE_PING = 1 + RQENTRY_TYPE_REQUEST = 0 } type; }; @@ -210,6 +209,7 @@ private: volatile bool _running; BlockingQueue<_RQEntry *> _queue; std::vector _threads; + volatile uint64_t _lastDumpedStatus; Mutex _threads_m; JSONDB _db; diff --git a/osdep/BlockingQueue.hpp b/osdep/BlockingQueue.hpp index 43ae74350..5e1a24ef4 100644 --- a/osdep/BlockingQueue.hpp +++ b/osdep/BlockingQueue.hpp @@ -30,6 +30,7 @@ #include #include #include +#include namespace ZeroTier { @@ -58,10 +59,6 @@ public: c.notify_all(); } - /** - * @param value Value to set to next queue item if return value is true - * @return False if stop() has been called, true otherwise - */ inline bool get(T &value) { std::unique_lock lock(m); @@ -75,6 +72,29 @@ public: return true; } + enum TimedWaitResult + { + OK, + TIMED_OUT, + STOP + }; + + inline TimedWaitResult get(T &value,const unsigned long ms) + { + const std::chrono::milliseconds ms2{ms}; + std::unique_lock lock(m); + if (!r) return STOP; + while (q.empty()) { + if (c.wait_for(lock,ms2) == std::cv_status::timeout) + return ((r) ? TIMED_OUT : STOP); + else if (!r) + return STOP; + } + value = q.front(); + q.pop(); + return OK; + } + private: volatile bool r; std::queue q;