diff --git a/service/OneService.cpp b/service/OneService.cpp index effe90c20..ad5680c2c 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -174,8 +174,8 @@ namespace ZeroTier { typedef BSDEthernetTap EthernetTap; } // TCP activity timeout #define ZT_TCP_ACTIVITY_TIMEOUT 60000 -// Number of receive path threads to start -#define ZT_INCOMING_PACKET_THREAD_POOL_SIZE 8 +// Max number of packet handler threads to start +#define ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE 16 #if ZT_VAULT_SUPPORT size_t curlResponseWrite(void *ptr, size_t size, size_t nmemb, std::string *data) @@ -465,7 +465,7 @@ public: unsigned int _tertiaryPort; volatile unsigned int _udpPortPickerCounter; -#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE + unsigned int _incomingPacketThreadPoolSize; struct { uint8_t data[2048]; std::thread thr; @@ -474,8 +474,7 @@ public: int size; std::condition_variable cond; std::mutex lock; - } _incomingPacketWorker[ZT_INCOMING_PACKET_THREAD_POOL_SIZE]; -#endif + } _incomingPacketWorker[ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE]; // Local configuration and memo-ized information from it json _localConfig; @@ -606,8 +605,8 @@ public: _ports[1] = 0; _ports[2] = 0; -#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE - for(unsigned int tn=0;tn l(_incomingPacketWorker[tn].lock); for(;;) { @@ -636,7 +635,6 @@ public: } }); } -#endif #if ZT_VAULT_SUPPORT curl_global_init(CURL_GLOBAL_DEFAULT); @@ -645,17 +643,15 @@ public: virtual ~OneServiceImpl() { -#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE - for(unsigned int tn=0;tn= 16)&&(reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) _lastDirectReceiveFromGlobal = OSUtils::now(); -#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE - unsigned long cksum = 0; - for(unsigned int i=0;isa_family) { + case AF_INET: + for(unsigned int i=0;i<4;++i) + cksum += ((const uint8_t *)(&(((const struct sockaddr_in *)from)->sin_addr.s_addr)))[i]; + break; + case AF_INET6: + for(unsigned int i=0;i<16;++i) + cksum += ((const struct sockaddr_in6 *)from)->sin6_addr.s6_addr[i]; + break; + default: + for(unsigned int i=0;i(sock); - memcpy(&_incomingPacketWorker[tn].from,from,sizeof(struct sockaddr_storage)); + ZT_FAST_MEMCPY(&_incomingPacketWorker[tn].from,from,sizeof(struct sockaddr_storage)); _incomingPacketWorker[tn].size = (int)len; _incomingPacketWorker[tn].lock.unlock(); _incomingPacketWorker[tn].cond.notify_all(); -#else - const ZT_ResultCode rc = _node->processWirePacket( - (void *)0, - OSUtils::now(), - reinterpret_cast(sock), - reinterpret_cast(from), // Phy<> uses sockaddr_storage, so it'll always be that big - data, - len, - &_nextBackgroundTaskDeadline); - if (ZT_ResultCode_isFatal(rc)) { - char tmp[256]; - OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); - Mutex::Lock _l(_termReason_m); - _termReason = ONE_UNRECOVERABLE_ERROR; - _fatalErrorMessage = tmp; - this->terminate(); - } -#endif } inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)