Simplify packet critical path. Plus more platform fixes

This commit is contained in:
Joseph Henry 2024-08-22 12:59:06 -07:00
parent e734019216
commit b813ea70a5
No known key found for this signature in database
GPG Key ID: C45B33FF5EBC9344
7 changed files with 31 additions and 27 deletions

View File

@ -869,12 +869,7 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar
const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
const uint8_t *const frameData = reinterpret_cast<const uint8_t *>(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; const uint8_t *const frameData = reinterpret_cast<const uint8_t *>(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
if (network->filterIncomingPacket(tPtr,peer,RR->identity.address(),sourceMac,network->mac(),frameData,frameLen,etherType,0) > 0) { if (network->filterIncomingPacket(tPtr,peer,RR->identity.address(),sourceMac,network->mac(),frameData,frameLen,etherType,0) > 0) {
if (RR->node->getMultithreadingEnabled()) { RR->pm->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen, _flowId);
RR->pm->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen, _flowId);
}
else {
RR->node->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen);
}
} }
} }
} else { } else {
@ -947,12 +942,7 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const
} }
// fall through -- 2 means accept regardless of bridging checks or other restrictions // fall through -- 2 means accept regardless of bridging checks or other restrictions
case 2: case 2:
if (RR->node->getMultithreadingEnabled()) { RR->pm->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen, flowId);
RR->pm->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen, flowId);
}
else {
RR->node->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen);
}
break; break;
} }
} }

View File

@ -240,9 +240,8 @@ ZT_ResultCode Node::processVirtualNetworkFrame(
} }
} }
void Node::initMultithreading(bool isEnabled, unsigned int concurrency, bool cpuPinningEnabled) void Node::initMultithreading(unsigned int concurrency, bool cpuPinningEnabled)
{ {
_multithreadingEnabled = isEnabled;
RR->pm->setUpPostDecodeReceiveThreads(concurrency, cpuPinningEnabled); RR->pm->setUpPostDecodeReceiveThreads(concurrency, cpuPinningEnabled);
} }

View File

@ -283,12 +283,7 @@ public:
return _lowBandwidthMode; return _lowBandwidthMode;
} }
inline bool getMultithreadingEnabled() void initMultithreading(unsigned int concurrency, bool cpuPinningEnabled);
{
return _multithreadingEnabled;
}
void initMultithreading(bool isEnabled, unsigned int concurrency, bool cpuPinningEnabled);
public: public:
@ -339,7 +334,6 @@ public:
volatile int64_t _prngState[2]; volatile int64_t _prngState[2];
bool _online; bool _online;
bool _lowBandwidthMode; bool _lowBandwidthMode;
bool _multithreadingEnabled;
}; };
} // namespace ZeroTier } // namespace ZeroTier

View File

@ -15,6 +15,7 @@
#include "Node.hpp" #include "Node.hpp"
#include "RuntimeEnvironment.hpp" #include "RuntimeEnvironment.hpp"
#include "Constants.hpp"
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
@ -28,6 +29,16 @@ PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv)
void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId) void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId)
{ {
#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__)
RR->node->putFrame(tPtr,nwid,nuptr,source,dest,etherType,vlanId,(const void *)data,len);
return;
#endif
if (!_enabled) {
RR->node->putFrame(tPtr,nwid,nuptr,source,dest,etherType,vlanId,(const void *)data,len);
return;
}
PacketRecord* packet; PacketRecord* packet;
_rxPacketVector_m.lock(); _rxPacketVector_m.lock();
if (_rxPacketVector.empty()) { if (_rxPacketVector.empty()) {
@ -56,9 +67,10 @@ void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const
void PacketMultiplexer::setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled) void PacketMultiplexer::setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled)
{ {
if (! RR->node->getMultithreadingEnabled()) { #if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__)
return; return;
} #endif
_enabled = true;
_concurrency = concurrency; _concurrency = concurrency;
bool _enablePinning = cpuPinningEnabled; bool _enablePinning = cpuPinningEnabled;

View File

@ -57,6 +57,7 @@ class PacketMultiplexer {
std::vector<std::thread> _rxThreads; std::vector<std::thread> _rxThreads;
unsigned int _rxThreadCount; unsigned int _rxThreadCount;
bool _enabled;
}; };
} // namespace ZeroTier } // namespace ZeroTier

View File

@ -140,7 +140,7 @@ std::shared_ptr<EthernetTap> EthernetTap::newInstance(
#endif // __NetBSD__ #endif // __NetBSD__
#ifdef __OpenBSD__ #ifdef __OpenBSD__
return std::shared_ptr<EthernetTap>(new BSDEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); return std::shared_ptr<EthernetTap>(new BSDEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg));
#endif // __OpenBSD__ #endif // __OpenBSD__
#endif // ZT_SDK? #endif // ZT_SDK?

View File

@ -991,7 +991,10 @@ public:
void setUpMultithreading() void setUpMultithreading()
{ {
_node->initMultithreading(true, _concurrency, _cpuPinningEnabled); #if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__)
return;
#endif
_node->initMultithreading(_concurrency, _cpuPinningEnabled);
bool pinning = _cpuPinningEnabled; bool pinning = _cpuPinningEnabled;
fprintf(stderr, "Starting %d RX threads\n", _concurrency); fprintf(stderr, "Starting %d RX threads\n", _concurrency);
@ -2648,6 +2651,7 @@ public:
fprintf(stderr,"WARNING: using manually-specified secondary and/or tertiary ports. This can cause NAT issues." ZT_EOL_S); fprintf(stderr,"WARNING: using manually-specified secondary and/or tertiary ports. This can cause NAT issues." ZT_EOL_S);
} }
_portMappingEnabled = OSUtils::jsonBool(settings["portMappingEnabled"],true); _portMappingEnabled = OSUtils::jsonBool(settings["portMappingEnabled"],true);
#if defined(__LINUX__) || defined(__FreeBSD__)
_multicoreEnabled = OSUtils::jsonBool(settings["multicoreEnabled"],false); _multicoreEnabled = OSUtils::jsonBool(settings["multicoreEnabled"],false);
_concurrency = OSUtils::jsonInt(settings["concurrency"],0); _concurrency = OSUtils::jsonInt(settings["concurrency"],0);
_cpuPinningEnabled = OSUtils::jsonBool(settings["cpuPinningEnabled"],false); _cpuPinningEnabled = OSUtils::jsonBool(settings["cpuPinningEnabled"],false);
@ -2660,6 +2664,7 @@ public:
} }
setUpMultithreading(); setUpMultithreading();
} }
#endif
#ifndef ZT_SDK #ifndef ZT_SDK
const std::string up(OSUtils::jsonString(settings["softwareUpdate"],ZT_SOFTWARE_UPDATE_DEFAULT)); const std::string up(OSUtils::jsonString(settings["softwareUpdate"],ZT_SOFTWARE_UPDATE_DEFAULT));
@ -2987,7 +2992,7 @@ public:
if ((len >= 16) && (reinterpret_cast<const InetAddress*>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) { if ((len >= 16) && (reinterpret_cast<const InetAddress*>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) {
_lastDirectReceiveFromGlobal = now; _lastDirectReceiveFromGlobal = now;
} }
#if defined(__LINUX__) || defined(__FreeBSD__)
if (_multicoreEnabled) { if (_multicoreEnabled) {
PacketRecord* packet; PacketRecord* packet;
_rxPacketVector_m.lock(); _rxPacketVector_m.lock();
@ -3008,6 +3013,7 @@ public:
_rxPacketQueue.postLimit(packet, 256 * _concurrency); _rxPacketQueue.postLimit(packet, 256 * _concurrency);
} }
else { else {
#endif
const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast<int64_t>(sock),reinterpret_cast<const struct sockaddr_storage *>(from),data,len,&_nextBackgroundTaskDeadline); const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast<int64_t>(sock),reinterpret_cast<const struct sockaddr_storage *>(from),data,len,&_nextBackgroundTaskDeadline);
if (ZT_ResultCode_isFatal(rc)) { if (ZT_ResultCode_isFatal(rc)) {
char tmp[256]; char tmp[256];
@ -3017,7 +3023,9 @@ public:
_fatalErrorMessage = tmp; _fatalErrorMessage = tmp;
this->terminate(); this->terminate();
} }
#if defined(__LINUX__) || defined(__FreeBSD__)
} }
#endif
} }