Automatic periodic status dump from controller.

This commit is contained in:
Adam Ierymenko 2017-07-18 15:36:33 -07:00
parent ae65eb5105
commit 31785f7f6e
3 changed files with 52 additions and 52 deletions

View File

@ -429,6 +429,7 @@ static bool _parseRule(json &r,ZT_VirtualNetworkRule &rule)
EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath) : EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath) :
_startTime(OSUtils::now()), _startTime(OSUtils::now()),
_running(true), _running(true),
_lastDumpedStatus(0),
_db(dbPath), _db(dbPath),
_node(node) _node(node)
{ {
@ -1010,20 +1011,6 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
} // else 404 } // else 404
} else if (path[0] == "ping") {
_startThreads();
_RQEntry *qe = new _RQEntry;
qe->type = _RQEntry::RQENTRY_TYPE_PING;
_queue.post(qe);
char tmp[64];
OSUtils::ztsnprintf(tmp,sizeof(tmp),"{\"clock\":%llu,\"ping\":%s}",(unsigned long long)now,OSUtils::jsonDump(b).c_str());
responseBody = tmp;
responseContentType = "application/json";
return 200;
} }
return 404; return 404;
@ -1088,8 +1075,9 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE(
void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt) void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt)
{ {
static volatile unsigned long idCounter = 0; static volatile unsigned long idCounter = 0;
char id[128]; char id[128];
std::string k,v;
try { try {
std::vector<uint64_t> nw4m(_db.networksForMember(rt.origin)); std::vector<uint64_t> nw4m(_db.networksForMember(rt.origin));
@ -1103,29 +1091,19 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt)
for(char *l=Utils::stok(rt.data,"\n",&saveptr);(l);l=Utils::stok((char *)0,"\n",&saveptr)) { for(char *l=Utils::stok(rt.data,"\n",&saveptr);(l);l=Utils::stok((char *)0,"\n",&saveptr)) {
char *eq = strchr(l,'='); char *eq = strchr(l,'=');
if (eq > l) { if (eq > l) {
std::string k(l,(unsigned long)(eq - l)); k.assign(l,(unsigned long)(eq - l));
std::string v; v.clear();
++eq; ++eq;
while (*eq) { while (*eq) {
if (*eq == '\\') { if (*eq == '\\') {
++eq; ++eq;
if (*eq) { if (*eq) {
switch(*eq) { switch(*eq) {
case 'r': case 'r': v.push_back('\r'); break;
v.push_back('\r'); case 'n': v.push_back('\n'); break;
break; case '0': v.push_back((char)0); break;
case 'n': case 'e': v.push_back('='); break;
v.push_back('\n'); default: v.push_back(*eq); break;
break;
case '0':
v.push_back((char)0);
break;
case 'e':
v.push_back('=');
break;
default:
v.push_back(*eq);
break;
} }
++eq; ++eq;
} }
@ -1133,7 +1111,7 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt)
v.push_back(*(eq++)); v.push_back(*(eq++));
} }
} }
if (v.length() > 0) if ((k.length() > 0)&&(v.length() > 0))
d[k] = v; d[k] = v;
} }
} }
@ -1177,20 +1155,23 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt)
void EmbeddedNetworkController::threadMain() void EmbeddedNetworkController::threadMain()
throw() throw()
{ {
char tmp[256];
_RQEntry *qe = (_RQEntry *)0; _RQEntry *qe = (_RQEntry *)0;
while ((_running)&&(_queue.get(qe))) { while ((_running)&&(_queue.get(qe,1000) != BlockingQueue<_RQEntry *>::STOP)) {
try { try {
if (qe->type == _RQEntry::RQENTRY_TYPE_REQUEST) { if (qe->type == _RQEntry::RQENTRY_TYPE_REQUEST)
_request(qe->nwid,qe->fromAddr,qe->requestPacketId,qe->identity,qe->metaData); _request(qe->nwid,qe->fromAddr,qe->requestPacketId,qe->identity,qe->metaData);
} else if (qe->type == _RQEntry::RQENTRY_TYPE_PING) {
// Every 10s we update a 'status' containing member online state, etc.
const uint64_t now = OSUtils::now(); const uint64_t now = OSUtils::now();
if ((now - _lastDumpedStatus) >= 10000) {
_lastDumpedStatus = now;
bool first = true; bool first = true;
std::string pong("{\"memberStatus\":{"); std::string st("{\"id\":\"status\",\"objtype\":\"status\",\"memberStatus\":{");
{ {
Mutex::Lock _l(_memberStatus_m); Mutex::Lock _l(_memberStatus_m);
pong.reserve(48 * (_memberStatus.size() + 1)); st.reserve(48 * (_memberStatus.size() + 1));
_db.eachId([this,&pong,&now,&first](uint64_t networkId,uint64_t nodeId) { _db.eachId([this,&st,&now,&first,&tmp](uint64_t networkId,uint64_t nodeId) {
char tmp[64];
uint64_t lrt = 0ULL; uint64_t lrt = 0ULL;
auto ms = this->_memberStatus.find(_MemberStatusKey(networkId,nodeId)); auto ms = this->_memberStatus.find(_MemberStatusKey(networkId,nodeId));
if (ms != _memberStatus.end()) if (ms != _memberStatus.end())
@ -1200,14 +1181,13 @@ void EmbeddedNetworkController::threadMain()
(unsigned long long)networkId, (unsigned long long)networkId,
(unsigned long long)nodeId, (unsigned long long)nodeId,
(unsigned long long)lrt); (unsigned long long)lrt);
pong.append(tmp); st.append(tmp);
first = false; first = false;
}); });
} }
char tmp2[256]; OSUtils::ztsnprintf(tmp,sizeof(tmp),"},\"clock\":%llu,\"startTime\":%llu}",(unsigned long long)now,(unsigned long long)_startTime);
OSUtils::ztsnprintf(tmp2,sizeof(tmp2),"},\"clock\":%llu,\"startTime\":%llu}",(unsigned long long)now,(unsigned long long)_startTime); st.append(tmp);
pong.append(tmp2); _db.writeRaw("status",st);
_db.writeRaw("pong",pong);
} }
} catch ( ... ) {} } catch ( ... ) {}
delete qe; delete qe;

View File

@ -105,8 +105,7 @@ private:
Identity identity; Identity identity;
Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> metaData; Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> metaData;
enum { enum {
RQENTRY_TYPE_REQUEST = 0, RQENTRY_TYPE_REQUEST = 0
RQENTRY_TYPE_PING = 1
} type; } type;
}; };
@ -210,6 +209,7 @@ private:
volatile bool _running; volatile bool _running;
BlockingQueue<_RQEntry *> _queue; BlockingQueue<_RQEntry *> _queue;
std::vector<Thread> _threads; std::vector<Thread> _threads;
volatile uint64_t _lastDumpedStatus;
Mutex _threads_m; Mutex _threads_m;
JSONDB _db; JSONDB _db;

View File

@ -30,6 +30,7 @@
#include <queue> #include <queue>
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include <chrono>
namespace ZeroTier { namespace ZeroTier {
@ -58,10 +59,6 @@ public:
c.notify_all(); c.notify_all();
} }
/**
* @param value Value to set to next queue item if return value is true
* @return False if stop() has been called, true otherwise
*/
inline bool get(T &value) inline bool get(T &value)
{ {
std::unique_lock<std::mutex> lock(m); std::unique_lock<std::mutex> lock(m);
@ -75,6 +72,29 @@ public:
return true; return true;
} }
enum TimedWaitResult
{
OK,
TIMED_OUT,
STOP
};
inline TimedWaitResult get(T &value,const unsigned long ms)
{
const std::chrono::milliseconds ms2{ms};
std::unique_lock<std::mutex> lock(m);
if (!r) return STOP;
while (q.empty()) {
if (c.wait_for(lock,ms2) == std::cv_status::timeout)
return ((r) ? TIMED_OUT : STOP);
else if (!r)
return STOP;
}
value = q.front();
q.pop();
return OK;
}
private: private:
volatile bool r; volatile bool r;
std::queue<T> q; std::queue<T> q;