Drop Sqlite-based Log table for now and switch to an in-memory log for recent activity. Log table gets too big on busy nodes. Should probably support push of events to some kind of event system later.

This commit is contained in:
Adam Ierymenko 2015-09-15 10:59:23 -07:00
parent ef316ced3b
commit 610ab0750c
4 changed files with 77 additions and 89 deletions

View File

@ -210,11 +210,6 @@ SqliteNetworkController::SqliteNetworkController(const char *dbPath) :
||(sqlite3_prepare_v2(_db,"DELETE FROM Gateway WHERE networkId = ?",-1,&_sDeleteGateways,(const char **)0) != SQLITE_OK)
||(sqlite3_prepare_v2(_db,"INSERT INTO Gateway (networkId,\"ip\",ipVersion,metric) VALUES (?,?,?,?)",-1,&_sCreateGateway,(const char **)0) != SQLITE_OK)
/* Log */
||(sqlite3_prepare_v2(_db,"INSERT INTO \"Log\" (networkId,nodeId,\"ts\",\"authorized\",\"version\",fromAddr) VALUES (?,?,?,?,?,?)",-1,&_sPutLog,(const char **)0) != SQLITE_OK)
||(sqlite3_prepare_v2(_db,"SELECT \"ts\",\"authorized\",\"version\",fromAddr FROM \"Log\" WHERE networkId = ? AND nodeId = ? AND \"ts\" >= ? ORDER BY \"ts\" ASC",-1,&_sGetMemberLog,(const char **)0) != SQLITE_OK)
||(sqlite3_prepare_v2(_db,"SELECT \"ts\",\"authorized\",\"version\",fromAddr FROM \"Log\" WHERE networkId = ? AND nodeId = ? ORDER BY \"ts\" DESC LIMIT 10",-1,&_sGetRecentMemberLog,(const char **)0) != SQLITE_OK)
/* Config */
||(sqlite3_prepare_v2(_db,"SELECT \"v\" FROM \"Config\" WHERE \"k\" = ?",-1,&_sGetConfig,(const char **)0) != SQLITE_OK)
||(sqlite3_prepare_v2(_db,"INSERT OR REPLACE INTO \"Config\" (\"k\",\"v\") VALUES (?,?)",-1,&_sSetConfig,(const char **)0) != SQLITE_OK)
@ -296,9 +291,6 @@ SqliteNetworkController::~SqliteNetworkController()
sqlite3_finalize(_sIncrementMemberRevisionCounter);
sqlite3_finalize(_sGetConfig);
sqlite3_finalize(_sSetConfig);
sqlite3_finalize(_sPutLog);
sqlite3_finalize(_sGetMemberLog);
sqlite3_finalize(_sGetRecentMemberLog);
sqlite3_close(_db);
}
}
@ -1074,28 +1066,35 @@ unsigned int SqliteNetworkController::_doCPGet(
responseBody.append("],\n\t\"recentLog\": [");
sqlite3_reset(_sGetRecentMemberLog);
sqlite3_bind_text(_sGetRecentMemberLog,1,nwids,16,SQLITE_STATIC);
sqlite3_bind_text(_sGetRecentMemberLog,2,addrs,10,SQLITE_STATIC);
bool firstLog = true;
while (sqlite3_step(_sGetRecentMemberLog) == SQLITE_ROW) {
responseBody.append(firstLog ? "{" : ",{");
firstLog = false;
responseBody.append("\"ts\":");
responseBody.append(reinterpret_cast<const char *>(sqlite3_column_text(_sGetRecentMemberLog,0)));
responseBody.append((sqlite3_column_int(_sGetRecentMemberLog,1) == 0) ? ",\"authorized\":false,\"version\":" : ",\"authorized\":true,\"version\":");
const char *ver = reinterpret_cast<const char *>(sqlite3_column_text(_sGetRecentMemberLog,2));
if ((ver)&&(ver[0])) {
responseBody.push_back('"');
responseBody.append(_jsonEscape(ver));
responseBody.append("\",\"fromAddr\":");
} else responseBody.append("null,\"fromAddr\":");
const char *fa = reinterpret_cast<const char *>(sqlite3_column_text(_sGetRecentMemberLog,3));
if ((fa)&&(fa[0])) {
responseBody.push_back('"');
responseBody.append(_jsonEscape(fa));
responseBody.append("\"}");
} else responseBody.append("null}");
{
std::map< std::pair<Address,uint64_t>,_LLEntry >::const_iterator lli(_lastLog.find(std::pair<Address,uint64_t>(Address(address),nwid)));
if (lli != _lastLog.end()) {
const _LLEntry &lastLogEntry = lli->second;
uint64_t eptr = lastLogEntry.totalRequests;
for(int k=0;k<ZT_SQLITENETWORKCONTROLLER_IN_MEMORY_LOG_SIZE;++k) {
if (!eptr--)
break;
const unsigned long ptr = (unsigned long)eptr % ZT_SQLITENETWORKCONTROLLER_IN_MEMORY_LOG_SIZE;
char tsbuf[64];
Utils::snprintf(tsbuf,sizeof(tsbuf),"%llu",(unsigned long long)lastLogEntry.l[ptr].ts);
responseBody.append((k == 0) ? "{" : ",{");
responseBody.append("\"ts\":");
responseBody.append(tsbuf);
responseBody.append(lastLogEntry.l[ptr].authorized ? ",\"authorized\":false,\"version\":" : ",\"authorized\":true,\"version\":");
if (lastLogEntry.l[ptr].version[0]) {
responseBody.push_back('"');
responseBody.append(_jsonEscape(lastLogEntry.l[ptr].version));
responseBody.append("\",\"fromAddr\":");
} else responseBody.append("null,\"fromAddr\":");
if (lastLogEntry.l[ptr].fromAddr) {
responseBody.push_back('"');
responseBody.append(_jsonEscape(lastLogEntry.l[ptr].fromAddr.toString()));
responseBody.append("\"}");
} else responseBody.append("null}");
}
}
}
responseBody.append("]\n}\n");
@ -1430,14 +1429,12 @@ NetworkController::ResultCode SqliteNetworkController::_doNetworkConfigRequest(c
return NetworkController::NETCONF_QUERY_INTERNAL_SERVER_ERROR;
}
// Check rate limit
{
uint64_t &lrt = _lastRequestTime[std::pair<Address,uint64_t>(identity.address(),nwid)];
uint64_t lrt2 = lrt;
if (((lrt = OSUtils::now()) - lrt2) <= ZT_NETCONF_MIN_REQUEST_PERIOD)
return NetworkController::NETCONF_QUERY_IGNORE;
}
// Check rate limit circuit breaker to prevent flooding
const uint64_t now = OSUtils::now();
_LLEntry &lastLogEntry = _lastLog[std::pair<Address,uint64_t>(identity.address(),nwid)];
if ((now - lastLogEntry.lastRequestTime) <= ZT_NETCONF_MIN_REQUEST_PERIOD)
return NetworkController::NETCONF_QUERY_IGNORE;
lastLogEntry.lastRequestTime = now;
NetworkRecord network;
memset(&network,0,sizeof(network));
@ -1523,28 +1520,18 @@ NetworkController::ResultCode SqliteNetworkController::_doNetworkConfigRequest(c
sqlite3_step(_sIncrementMemberRevisionCounter);
}
// Add log entry
// Add log entry to in-memory circular log
{
char ver[16];
std::string fa;
if (fromAddr) {
fa = fromAddr.toString();
if (fa.length() > 64)
fa = fa.substr(0,64);
}
sqlite3_reset(_sPutLog);
sqlite3_bind_text(_sPutLog,1,network.id,16,SQLITE_STATIC);
sqlite3_bind_text(_sPutLog,2,member.nodeId,10,SQLITE_STATIC);
sqlite3_bind_int64(_sPutLog,3,(long long)OSUtils::now());
sqlite3_bind_int(_sPutLog,4,member.authorized ? 1 : 0);
if ((clientMajorVersion > 0)||(clientMinorVersion > 0)||(clientRevision > 0)) {
Utils::snprintf(ver,sizeof(ver),"%u.%u.%u",clientMajorVersion,clientMinorVersion,clientRevision);
sqlite3_bind_text(_sPutLog,5,ver,-1,SQLITE_STATIC);
} else sqlite3_bind_null(_sPutLog,5);
if (fa.length() > 0)
sqlite3_bind_text(_sPutLog,6,fa.c_str(),-1,SQLITE_STATIC);
else sqlite3_bind_null(_sPutLog,6);
sqlite3_step(_sPutLog);
const unsigned long ptr = (unsigned long)lastLogEntry.totalRequests % ZT_SQLITENETWORKCONTROLLER_IN_MEMORY_LOG_SIZE;
lastLogEntry.l[ptr].ts = now;
lastLogEntry.l[ptr].fromAddr = fromAddr;
if ((clientMajorVersion > 0)||(clientMinorVersion > 0)||(clientRevision > 0))
Utils::snprintf(lastLogEntry.l[ptr].version,sizeof(lastLogEntry.l[ptr].version),"%u.%u.%u",clientMajorVersion,clientMinorVersion,clientRevision);
else lastLogEntry.l[ptr].version[0] = (char)0;
lastLogEntry.l[ptr].authorized = member.authorized;
++lastLogEntry.totalRequests;
// TODO: push or save these somewhere
}
// Check member authorization

View File

@ -40,6 +40,9 @@
#include "../node/NetworkController.hpp"
#include "../node/Mutex.hpp"
// Number of in-memory last log entries to maintain per user
#define ZT_SQLITENETWORKCONTROLLER_IN_MEMORY_LOG_SIZE 32
namespace ZeroTier {
class SqliteNetworkController : public NetworkController
@ -104,7 +107,34 @@ private:
std::string _dbPath;
std::string _instanceId;
std::map< std::pair<Address,uint64_t>,uint64_t > _lastRequestTime;
// A circular buffer last log
struct _LLEntry
{
_LLEntry()
{
for(long i=0;i<ZT_SQLITENETWORKCONTROLLER_IN_MEMORY_LOG_SIZE;++i)
this->l[i].ts = 0;
this->lastRequestTime = 0;
this->totalRequests = 0;
}
// Circular buffer of last log entries
struct {
uint64_t ts; // timestamp or 0 if circular buffer entry unused
char version[64];
InetAddress fromAddr;
bool authorized;
} l[ZT_SQLITENETWORKCONTROLLER_IN_MEMORY_LOG_SIZE];
// Time of last request whether successful or not
uint64_t lastRequestTime;
// Total requests by this address / network ID pair (also serves mod IN_MEMORY_LOG_SIZE as circular buffer ptr)
uint64_t totalRequests;
};
// Last log entries by address and network ID pair
std::map< std::pair<Address,uint64_t>,_LLEntry > _lastLog;
sqlite3 *_db;
@ -151,9 +181,6 @@ private:
sqlite3_stmt *_sIncrementMemberRevisionCounter;
sqlite3_stmt *_sGetConfig;
sqlite3_stmt *_sSetConfig;
sqlite3_stmt *_sPutLog;
sqlite3_stmt *_sGetMemberLog;
sqlite3_stmt *_sGetRecentMemberLog;
Mutex _lock;
};

View File

@ -77,19 +77,6 @@ CREATE TABLE Member (
CREATE INDEX Member_networkId_activeBridge ON Member(networkId, activeBridge);
CREATE INDEX Member_networkId_memberRevision ON Member(networkId, memberRevision);
CREATE TABLE Log (
networkId char(16) NOT NULL,
nodeId char(10) NOT NULL,
ts integer NOT NULL,
authorized integer NOT NULL,
authTokenId integer,
version varchar(16),
fromAddr varchar(64)
);
CREATE INDEX Log_networkId_nodeId ON Log(networkId, nodeId);
CREATE INDEX Log_ts ON Log(ts);
CREATE TABLE Relay (
networkId char(16) NOT NULL REFERENCES Network(id) ON DELETE CASCADE,
address char(10) NOT NULL,

View File

@ -78,19 +78,6 @@
"CREATE INDEX Member_networkId_activeBridge ON Member(networkId, activeBridge);\n"\
"CREATE INDEX Member_networkId_memberRevision ON Member(networkId, memberRevision);\n"\
"\n"\
"CREATE TABLE Log (\n"\
" networkId char(16) NOT NULL,\n"\
" nodeId char(10) NOT NULL,\n"\
" ts integer NOT NULL,\n"\
" authorized integer NOT NULL,\n"\
" authTokenId integer,\n"\
" version varchar(16),\n"\
" fromAddr varchar(64)\n"\
");\n"\
"\n"\
"CREATE INDEX Log_networkId_nodeId ON Log(networkId, nodeId);\n"\
"CREATE INDEX Log_ts ON Log(ts);\n"\
"\n"\
"CREATE TABLE Relay (\n"\
" networkId char(16) NOT NULL REFERENCES Network(id) ON DELETE CASCADE,\n"\
" address char(10) NOT NULL,\n"\