Retire RethinkDB, simple receive path multithreading.

This commit is contained in:
Adam Ierymenko 2018-11-11 22:35:15 -08:00
parent 296e4616cc
commit c9c17eaddd
4 changed files with 79 additions and 0 deletions

View File

@ -146,10 +146,13 @@ private:
Identity _signingId; Identity _signingId;
std::string _signingIdAddressString; std::string _signingIdAddressString;
NetworkController::Sender *_sender; NetworkController::Sender *_sender;
std::unique_ptr<DB> _db; std::unique_ptr<DB> _db;
BlockingQueue< _RQEntry * > _queue; BlockingQueue< _RQEntry * > _queue;
std::vector<std::thread> _threads; std::vector<std::thread> _threads;
std::mutex _threads_l; std::mutex _threads_l;
std::unordered_map< _MemberStatusKey,_MemberStatus,_MemberStatusHash > _memberStatus; std::unordered_map< _MemberStatusKey,_MemberStatus,_MemberStatusHash > _memberStatus;
std::mutex _memberStatus_l; std::mutex _memberStatus_l;
}; };

View File

@ -34,6 +34,9 @@
#include <vector> #include <vector>
#include <algorithm> #include <algorithm>
#include <list> #include <list>
#include <thread>
#include <mutex>
#include <condition_variable>
#include "../version.h" #include "../version.h"
#include "../include/ZeroTierOne.h" #include "../include/ZeroTierOne.h"
@ -434,6 +437,8 @@ struct TcpConnection
Mutex writeq_m; Mutex writeq_m;
}; };
#define ZT_INCOMING_PACKET_THREAD_POOL_SIZE 4
class OneServiceImpl : public OneService class OneServiceImpl : public OneService
{ {
public: public:
@ -459,6 +464,18 @@ public:
unsigned int _tertiaryPort; unsigned int _tertiaryPort;
volatile unsigned int _udpPortPickerCounter; volatile unsigned int _udpPortPickerCounter;
#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
struct {
uint8_t data[2048];
std::thread thr;
int64_t sock;
struct sockaddr_storage from;
int size;
std::condition_variable cond;
std::mutex lock;
} _incomingPacketWorker[ZT_INCOMING_PACKET_THREAD_POOL_SIZE];
#endif
// Local configuration and memo-ized information from it // Local configuration and memo-ized information from it
json _localConfig; json _localConfig;
Hashtable< uint64_t,std::vector<InetAddress> > _v4Hints; Hashtable< uint64_t,std::vector<InetAddress> > _v4Hints;
@ -587,6 +604,39 @@ public:
_ports[0] = 0; _ports[0] = 0;
_ports[1] = 0; _ports[1] = 0;
_ports[2] = 0; _ports[2] = 0;
#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) {
_incomingPacketWorker[tn].thr = std::thread([this,tn]() {
std::unique_lock<std::mutex> l(_incomingPacketWorker[tn].lock);
for(;;) {
_incomingPacketWorker[tn].cond.wait(l);
if (_incomingPacketWorker[tn].size < 0) {
break;
} else if (_incomingPacketWorker[tn].size > 0) {
const ZT_ResultCode rc = _node->processWirePacket(
(void *)0,
OSUtils::now(),
_incomingPacketWorker[tn].sock,
&(_incomingPacketWorker[tn].from),
_incomingPacketWorker[tn].data,
(unsigned int)_incomingPacketWorker[tn].size,
&_nextBackgroundTaskDeadline);
if (ZT_ResultCode_isFatal(rc)) {
char tmp[256];
OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
Mutex::Lock _l(_termReason_m);
_termReason = ONE_UNRECOVERABLE_ERROR;
_fatalErrorMessage = tmp;
this->terminate();
break;
}
}
}
});
}
#endif
#if ZT_VAULT_SUPPORT #if ZT_VAULT_SUPPORT
curl_global_init(CURL_GLOBAL_DEFAULT); curl_global_init(CURL_GLOBAL_DEFAULT);
#endif #endif
@ -594,6 +644,17 @@ public:
virtual ~OneServiceImpl() virtual ~OneServiceImpl()
{ {
#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) {
_incomingPacketWorker[tn].lock.lock();
_incomingPacketWorker[tn].size = -1;
_incomingPacketWorker[tn].lock.unlock();
_incomingPacketWorker[tn].cond.notify_all();
}
for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) {
_incomingPacketWorker[tn].thr.join();
}
#endif
_binder.closeAll(_phy); _binder.closeAll(_phy);
_phy.close(_localControlSocket4); _phy.close(_localControlSocket4);
_phy.close(_localControlSocket6); _phy.close(_localControlSocket6);
@ -1840,6 +1901,20 @@ public:
{ {
if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL))
_lastDirectReceiveFromGlobal = OSUtils::now(); _lastDirectReceiveFromGlobal = OSUtils::now();
#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
unsigned long cksum = 0;
for(unsigned int i=0;i<sizeof(struct sockaddr_storage);++i) {
cksum += ((uint8_t *)from)[i];
}
const unsigned long tn = cksum % ZT_INCOMING_PACKET_THREAD_POOL_SIZE;
_incomingPacketWorker[tn].lock.lock();
memcpy(_incomingPacketWorker[tn].data,data,len);
_incomingPacketWorker[tn].sock = reinterpret_cast<int64_t>(sock);
memcpy(&_incomingPacketWorker[tn].from,from,sizeof(struct sockaddr_storage));
_incomingPacketWorker[tn].size = (int)len;
_incomingPacketWorker[tn].lock.unlock();
_incomingPacketWorker[tn].cond.notify_all();
#else
const ZT_ResultCode rc = _node->processWirePacket( const ZT_ResultCode rc = _node->processWirePacket(
(void *)0, (void *)0,
OSUtils::now(), OSUtils::now(),
@ -1856,6 +1931,7 @@ public:
_fatalErrorMessage = tmp; _fatalErrorMessage = tmp;
this->terminate(); this->terminate();
} }
#endif
} }
inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)