From b813ea70a5f9e7c2e91ec68461fdd132e8fc543a Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Thu, 22 Aug 2024 12:59:06 -0700 Subject: [PATCH] Simplify packet critical path. Plus more platform fixes --- node/IncomingPacket.cpp | 14 ++------------ node/Node.cpp | 3 +-- node/Node.hpp | 8 +------- node/PacketMultiplexer.cpp | 18 +++++++++++++++--- node/PacketMultiplexer.hpp | 1 + osdep/EthernetTap.cpp | 2 +- service/OneService.cpp | 12 ++++++++++-- 7 files changed, 31 insertions(+), 27 deletions(-) diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp index 4d6d5ce8e..318b05730 100644 --- a/node/IncomingPacket.cpp +++ b/node/IncomingPacket.cpp @@ -869,12 +869,7 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; const uint8_t *const frameData = reinterpret_cast(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; if (network->filterIncomingPacket(tPtr,peer,RR->identity.address(),sourceMac,network->mac(),frameData,frameLen,etherType,0) > 0) { - if (RR->node->getMultithreadingEnabled()) { - RR->pm->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen, _flowId); - } - else { - RR->node->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen); - } + RR->pm->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen, _flowId); } } } else { @@ -947,12 +942,7 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const } // fall through -- 2 means accept regardless of bridging checks or other restrictions case 2: - if (RR->node->getMultithreadingEnabled()) { - RR->pm->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen, flowId); - } - else { - RR->node->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen); - } + RR->pm->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen, flowId); break; } } diff --git a/node/Node.cpp b/node/Node.cpp index c7cb37df5..1f377c545 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -240,9 +240,8 @@ ZT_ResultCode Node::processVirtualNetworkFrame( } } -void Node::initMultithreading(bool isEnabled, unsigned int concurrency, bool cpuPinningEnabled) +void Node::initMultithreading(unsigned int concurrency, bool cpuPinningEnabled) { - _multithreadingEnabled = isEnabled; RR->pm->setUpPostDecodeReceiveThreads(concurrency, cpuPinningEnabled); } diff --git a/node/Node.hpp b/node/Node.hpp index da2d5427f..f9d05483a 100644 --- a/node/Node.hpp +++ b/node/Node.hpp @@ -283,12 +283,7 @@ public: return _lowBandwidthMode; } - inline bool getMultithreadingEnabled() - { - return _multithreadingEnabled; - } - - void initMultithreading(bool isEnabled, unsigned int concurrency, bool cpuPinningEnabled); + void initMultithreading(unsigned int concurrency, bool cpuPinningEnabled); public: @@ -339,7 +334,6 @@ public: volatile int64_t _prngState[2]; bool _online; bool _lowBandwidthMode; - bool _multithreadingEnabled; }; } // namespace ZeroTier diff --git a/node/PacketMultiplexer.cpp b/node/PacketMultiplexer.cpp index 401e1db9f..9035ccc0c 100644 --- a/node/PacketMultiplexer.cpp +++ b/node/PacketMultiplexer.cpp @@ -15,6 +15,7 @@ #include "Node.hpp" #include "RuntimeEnvironment.hpp" +#include "Constants.hpp" #include #include @@ -28,6 +29,16 @@ PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv) void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId) { +#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__) + RR->node->putFrame(tPtr,nwid,nuptr,source,dest,etherType,vlanId,(const void *)data,len); + return; +#endif + + if (!_enabled) { + RR->node->putFrame(tPtr,nwid,nuptr,source,dest,etherType,vlanId,(const void *)data,len); + return; + } + PacketRecord* packet; _rxPacketVector_m.lock(); if (_rxPacketVector.empty()) { @@ -56,9 +67,10 @@ void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const void PacketMultiplexer::setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled) { - if (! RR->node->getMultithreadingEnabled()) { - return; - } +#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__) + return; +#endif + _enabled = true; _concurrency = concurrency; bool _enablePinning = cpuPinningEnabled; diff --git a/node/PacketMultiplexer.hpp b/node/PacketMultiplexer.hpp index 8cd592a10..4753180ed 100644 --- a/node/PacketMultiplexer.hpp +++ b/node/PacketMultiplexer.hpp @@ -57,6 +57,7 @@ class PacketMultiplexer { std::vector _rxThreads; unsigned int _rxThreadCount; + bool _enabled; }; } // namespace ZeroTier diff --git a/osdep/EthernetTap.cpp b/osdep/EthernetTap.cpp index 4395bc404..0be209ecd 100644 --- a/osdep/EthernetTap.cpp +++ b/osdep/EthernetTap.cpp @@ -140,7 +140,7 @@ std::shared_ptr EthernetTap::newInstance( #endif // __NetBSD__ #ifdef __OpenBSD__ - return std::shared_ptr(new BSDEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new BSDEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); #endif // __OpenBSD__ #endif // ZT_SDK? diff --git a/service/OneService.cpp b/service/OneService.cpp index acce0ec9f..8d4b5bdfd 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -991,7 +991,10 @@ public: void setUpMultithreading() { - _node->initMultithreading(true, _concurrency, _cpuPinningEnabled); +#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__) + return; +#endif + _node->initMultithreading(_concurrency, _cpuPinningEnabled); bool pinning = _cpuPinningEnabled; fprintf(stderr, "Starting %d RX threads\n", _concurrency); @@ -2648,6 +2651,7 @@ public: fprintf(stderr,"WARNING: using manually-specified secondary and/or tertiary ports. This can cause NAT issues." ZT_EOL_S); } _portMappingEnabled = OSUtils::jsonBool(settings["portMappingEnabled"],true); +#if defined(__LINUX__) || defined(__FreeBSD__) _multicoreEnabled = OSUtils::jsonBool(settings["multicoreEnabled"],false); _concurrency = OSUtils::jsonInt(settings["concurrency"],0); _cpuPinningEnabled = OSUtils::jsonBool(settings["cpuPinningEnabled"],false); @@ -2660,6 +2664,7 @@ public: } setUpMultithreading(); } +#endif #ifndef ZT_SDK const std::string up(OSUtils::jsonString(settings["softwareUpdate"],ZT_SOFTWARE_UPDATE_DEFAULT)); @@ -2987,7 +2992,7 @@ public: if ((len >= 16) && (reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) { _lastDirectReceiveFromGlobal = now; } - +#if defined(__LINUX__) || defined(__FreeBSD__) if (_multicoreEnabled) { PacketRecord* packet; _rxPacketVector_m.lock(); @@ -3008,6 +3013,7 @@ public: _rxPacketQueue.postLimit(packet, 256 * _concurrency); } else { +#endif const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast(sock),reinterpret_cast(from),data,len,&_nextBackgroundTaskDeadline); if (ZT_ResultCode_isFatal(rc)) { char tmp[256]; @@ -3017,7 +3023,9 @@ public: _fatalErrorMessage = tmp; this->terminate(); } +#if defined(__LINUX__) || defined(__FreeBSD__) } +#endif }