diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp index a5dd77017..e34d4b048 100644 --- a/node/IncomingPacket.cpp +++ b/node/IncomingPacket.cpp @@ -38,6 +38,7 @@ #include "Path.hpp" #include "Bond.hpp" #include "Metrics.hpp" +#include "PacketMultiplexer.hpp" namespace ZeroTier { @@ -793,7 +794,7 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar { Metrics::pkt_frame_in++; int32_t _flowId = ZT_QOS_NO_FLOW; - if (peer->flowHashingSupported()) { + //if (peer->flowHashingSupported()) { if (size() > ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD) { const unsigned int etherType = at(ZT_PROTO_VERB_FRAME_IDX_ETHERTYPE); const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; @@ -855,7 +856,9 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar } } } - } + //} + + //fprintf(stderr, "IncomingPacket::_doFRAME: flowId=%d\n", _flowId); const uint64_t nwid = at(ZT_PROTO_VERB_FRAME_IDX_NETWORK_ID); const SharedPtr network(RR->node->network(nwid)); @@ -869,7 +872,8 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; const uint8_t *const frameData = reinterpret_cast(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; if (network->filterIncomingPacket(tPtr,peer,RR->identity.address(),sourceMac,network->mac(),frameData,frameLen,etherType,0) > 0) { - RR->node->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen); + //RR->node->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen); + RR->pm->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen, _flowId); } } } else { @@ -942,7 +946,8 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const } // fall through -- 2 means accept regardless of bridging checks or other restrictions case 2: - RR->node->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen); + //RR->node->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen); + RR->pm->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen, flowId); break; } } diff --git a/node/Node.cpp b/node/Node.cpp index 4913d1a4c..f2ad40fe0 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -35,6 +35,7 @@ #include "Network.hpp" #include "Trace.hpp" #include "Metrics.hpp" +#include "PacketMultiplexer.hpp" // FIXME: remove this suppression and actually fix warnings #ifdef __GNUC__ @@ -119,9 +120,10 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64 const unsigned long mcs = sizeof(Multicaster) + (((sizeof(Multicaster) & 0xf) != 0) ? (16 - (sizeof(Multicaster) & 0xf)) : 0); const unsigned long topologys = sizeof(Topology) + (((sizeof(Topology) & 0xf) != 0) ? (16 - (sizeof(Topology) & 0xf)) : 0); const unsigned long sas = sizeof(SelfAwareness) + (((sizeof(SelfAwareness) & 0xf) != 0) ? (16 - (sizeof(SelfAwareness) & 0xf)) : 0); - const unsigned long bc = sizeof(Bond) + (((sizeof(Bond) & 0xf) != 0) ? (16 - (sizeof(Bond) & 0xf)) : 0); + const unsigned long bcs = sizeof(Bond) + (((sizeof(Bond) & 0xf) != 0) ? (16 - (sizeof(Bond) & 0xf)) : 0); + const unsigned long pms = sizeof(PacketMultiplexer) + (((sizeof(PacketMultiplexer) & 0xf) != 0) ? (16 - (sizeof(PacketMultiplexer) & 0xf)) : 0); - m = reinterpret_cast(::malloc(16 + ts + sws + mcs + topologys + sas + bc)); + m = reinterpret_cast(::malloc(16 + ts + sws + mcs + topologys + sas + bcs + pms)); if (!m) { throw std::bad_alloc(); } @@ -141,6 +143,8 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64 RR->sa = new (m) SelfAwareness(RR); m += sas; RR->bc = new (m) Bond(RR); + m += bcs; + RR->pm = new (m) PacketMultiplexer(RR); } catch ( ... ) { if (RR->sa) { RR->sa->~SelfAwareness(); @@ -160,6 +164,9 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64 if (RR->bc) { RR->bc->~Bond(); } + if (RR->pm) { + RR->pm->~PacketMultiplexer(); + } ::free(m); throw; } @@ -191,6 +198,9 @@ Node::~Node() if (RR->bc) { RR->bc->~Bond(); } + if (RR->pm) { + RR->pm->~PacketMultiplexer(); + } ::free(RR->rtmem); } diff --git a/node/Node.hpp b/node/Node.hpp index 1f74b8340..1c789a0b0 100644 --- a/node/Node.hpp +++ b/node/Node.hpp @@ -283,7 +283,7 @@ public: return _lowBandwidthMode; } -private: +public: RuntimeEnvironment _RR; RuntimeEnvironment *RR; void *_uPtr; // _uptr (lower case) is reserved in Visual Studio :P diff --git a/node/PacketMultiplexer.cpp b/node/PacketMultiplexer.cpp new file mode 100644 index 000000000..6f559c4b8 --- /dev/null +++ b/node/PacketMultiplexer.cpp @@ -0,0 +1,125 @@ +/* + * Copyright (c)2013-2021 ZeroTier, Inc. + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file in the project's root directory. + * + * Change Date: 2026-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2.0 of the Apache License. + */ +/****/ + +#include "PacketMultiplexer.hpp" + +#include "Node.hpp" +#include "RuntimeEnvironment.hpp" + +#include +#include + +namespace ZeroTier { + +void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId) +{ + PacketRecord* packet; + _rxPacketVector_m.lock(); + if (_rxPacketVector.empty()) { + packet = new PacketRecord; + } + else { + packet = _rxPacketVector.back(); + _rxPacketVector.pop_back(); + } + _rxPacketVector_m.unlock(); + + packet->tPtr = tPtr; + packet->nwid = nwid; + packet->nuptr = nuptr; + packet->source = source.toInt(); + packet->dest = dest.toInt(); + packet->etherType = etherType; + packet->vlanId = vlanId; + packet->len = len; + packet->flowId = flowId; + memcpy(packet->data, data, len); + + int bucket = flowId % _concurrency; + //fprintf(stderr, "bucket=%d\n", bucket); + _rxPacketQueues[bucket]->postLimit(packet, 2048); +} + +PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv) +{ + RR = renv; + bool _enablePinning = false; + char* pinningVar = std::getenv("ZT_CPU_PINNING"); + if (pinningVar) { + int tmp = atoi(pinningVar); + if (tmp > 0) { + _enablePinning = true; + } + } + + _concurrency = 1; + char* concurrencyVar = std::getenv("ZT_PACKET_PROCESSING_CONCURRENCY"); + if (concurrencyVar) { + int tmp = atoi(concurrencyVar); + if (tmp > 0) { + _concurrency = tmp; + } + else { + _concurrency = std::max((unsigned int)1, std::thread::hardware_concurrency() / 2); + } + } + else { + _concurrency = std::max((unsigned int)1, std::thread::hardware_concurrency() / 2); + } + + for (unsigned int i = 0; i < _concurrency; ++i) { + fprintf(stderr, "reserved queue for thread %d\n", i); + _rxPacketQueues.push_back(new BlockingQueue()); + } + + // Each thread picks from its own queue to feed into the core + for (unsigned int i = 0; i < _concurrency; ++i) { + _rxThreads.push_back(std::thread([this, i, _enablePinning]() { + fprintf(stderr, "created post-decode packet ingestion thread %d\n", i); + + PacketRecord* packet = nullptr; + for (;;) { + if (! _rxPacketQueues[i]->get(packet)) { + break; + } + if (! packet) { + break; + } + + //fprintf(stderr, "popped packet from queue %d\n", i); + + MAC sourceMac = MAC(packet->source); + MAC destMac = MAC(packet->dest); + + RR->node->putFrame(packet->tPtr, packet->nwid, packet->nuptr, sourceMac, destMac, packet->etherType, 0, (const void*)packet->data, packet->len); + { + Mutex::Lock l(_rxPacketVector_m); + _rxPacketVector.push_back(packet); + } + /* + if (ZT_ResultCode_isFatal(err)) { + char tmp[256]; + OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err); + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = tmp; + this->terminate(); + break; + } + */ + } + })); + } +}; + +} // namespace ZeroTier \ No newline at end of file diff --git a/node/PacketMultiplexer.hpp b/node/PacketMultiplexer.hpp new file mode 100644 index 000000000..152bda320 --- /dev/null +++ b/node/PacketMultiplexer.hpp @@ -0,0 +1,62 @@ +/* + * Copyright (c)2013-2021 ZeroTier, Inc. + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file in the project's root directory. + * + * Change Date: 2026-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2.0 of the Apache License. + */ +/****/ + +#ifndef ZT_PACKET_MULTIPLEXER_HPP +#define ZT_PACKET_MULTIPLEXER_HPP + +#include "../osdep/BlockingQueue.hpp" +#include "MAC.hpp" +#include "Mutex.hpp" +#include "RuntimeEnvironment.hpp" + +#include +#include + +namespace ZeroTier { + +struct PacketRecord { + void* tPtr; + uint64_t nwid; + void** nuptr; + uint64_t source; + uint64_t dest; + unsigned int etherType; + unsigned int vlanId; + uint8_t data[ZT_MAX_MTU]; + unsigned int len; + unsigned int flowId; +}; + +class PacketMultiplexer { + public: + const RuntimeEnvironment* RR; + + PacketMultiplexer(const RuntimeEnvironment* renv); + + void putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId); + + std::vector*> _rxPacketQueues; + + unsigned int _concurrency; + // pool + std::vector _rxPacketVector; + std::vector _rxPacketThreads; + Mutex _rxPacketVector_m, _rxPacketThreads_m; + + std::vector _rxThreads; + unsigned int _rxThreadCount; +}; + +} // namespace ZeroTier + +#endif // ZT_PACKET_MULTIPLEXER_HPP \ No newline at end of file diff --git a/node/RuntimeEnvironment.hpp b/node/RuntimeEnvironment.hpp index 019645513..274a9f265 100644 --- a/node/RuntimeEnvironment.hpp +++ b/node/RuntimeEnvironment.hpp @@ -31,6 +31,7 @@ class NetworkController; class SelfAwareness; class Trace; class Bond; +class PacketMultiplexer; /** * Holds global state for an instance of ZeroTier::Node @@ -77,6 +78,7 @@ public: Topology *topology; SelfAwareness *sa; Bond *bc; + PacketMultiplexer *pm; // This node's identity and string representations thereof Identity identity; diff --git a/node/Switch.cpp b/node/Switch.cpp index 5ea1653c2..871a55b24 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -519,7 +519,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr &network,const RR->node->putFrame(tPtr, network->id(), network->userPtr(), peerMac, from, ZT_ETHERTYPE_IPV6, 0, adv, 72); }).detach(); - + return; // NDP emulation done. We have forged a "fake" reply, so no need to send actual NDP query. } // else no NDP emulation } // else no NDP emulation diff --git a/objects.mk b/objects.mk index d07578fb3..1d8a6c0a5 100644 --- a/objects.mk +++ b/objects.mk @@ -29,7 +29,8 @@ CORE_OBJS=\ node/Topology.o \ node/Trace.o \ node/Utils.o \ - node/Bond.o + node/Bond.o \ + node/PacketMultiplexer.o ONE_OBJS=\ controller/EmbeddedNetworkController.o \ diff --git a/osdep/EthernetTap.cpp b/osdep/EthernetTap.cpp index 95ce54b0a..cdc01f7b8 100644 --- a/osdep/EthernetTap.cpp +++ b/osdep/EthernetTap.cpp @@ -58,7 +58,6 @@ namespace ZeroTier { std::shared_ptr EthernetTap::newInstance( const char *tapDeviceType, // OS-specific, NULL for default const char *homePath, - unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -89,11 +88,11 @@ std::shared_ptr EthernetTap::newInstance( return std::shared_ptr(new MacEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); } } - } + }/ #endif // __APPLE__ #ifdef __LINUX__ - return std::shared_ptr(new LinuxEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new LinuxEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); #endif // __LINUX__ #ifdef __WINDOWS__ diff --git a/osdep/EthernetTap.hpp b/osdep/EthernetTap.hpp index c5e82470c..893e70c34 100644 --- a/osdep/EthernetTap.hpp +++ b/osdep/EthernetTap.hpp @@ -33,7 +33,6 @@ public: static std::shared_ptr newInstance( const char *tapDeviceType, // OS-specific, NULL for default const char *homePath, - unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, diff --git a/osdep/LinuxEthernetTap.cpp b/osdep/LinuxEthernetTap.cpp index 81ebedc11..160f4d76b 100644 --- a/osdep/LinuxEthernetTap.cpp +++ b/osdep/LinuxEthernetTap.cpp @@ -111,7 +111,6 @@ static void _base32_5_to_8(const uint8_t *in,char *out) LinuxEthernetTap::LinuxEthernetTap( const char *homePath, - unsigned int concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -128,7 +127,6 @@ LinuxEthernetTap::LinuxEthernetTap( _fd(0), _enabled(true), _run(true), - _concurrency(concurrency), _lastIfAddrsUpdate(0) { static std::mutex s_tapCreateLock; @@ -231,12 +229,27 @@ LinuxEthernetTap::LinuxEthernetTap( } } + int _concurrency = 1; + char* concurrencyVar = std::getenv("ZT_PACKET_PROCESSING_CONCURRENCY"); + if (concurrencyVar) { + int tmp = atoi(concurrencyVar); + if (tmp > 0) { + _concurrency = tmp; + } + else { + _concurrency = std::max((unsigned int)1,std::thread::hardware_concurrency() / 2); + } + } + else { + _concurrency = std::max((unsigned int)1,std::thread::hardware_concurrency() / 2); + } + for (unsigned int i = 0; i < _concurrency; ++i) { - _rxThreads.push_back(std::thread([this, i, _enablePinning] { + _rxThreads.push_back(std::thread([this, i, _concurrency, _enablePinning] { if (_enablePinning) { int pinCore = i % _concurrency; - fprintf(stderr, "pinning thread %d to core %d\n", i, pinCore); + fprintf(stderr, "pinning tap thread %d to core %d\n", i, pinCore); pthread_t self = pthread_self(); cpu_set_t cpuset; CPU_ZERO(&cpuset); @@ -244,7 +257,7 @@ LinuxEthernetTap::LinuxEthernetTap( int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); if (rc != 0) { - fprintf(stderr, "failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno)); + fprintf(stderr, "failed to pin tap thread %d to core %d: %s\n", i, pinCore, strerror(errno)); exit(1); } } diff --git a/osdep/LinuxEthernetTap.hpp b/osdep/LinuxEthernetTap.hpp index b694b277c..383c67b7f 100644 --- a/osdep/LinuxEthernetTap.hpp +++ b/osdep/LinuxEthernetTap.hpp @@ -35,7 +35,6 @@ class LinuxEthernetTap : public EthernetTap public: LinuxEthernetTap( const char *homePath, - unsigned int _concurrency, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -68,7 +67,6 @@ private: std::string _dev; std::vector _multicastGroups; unsigned int _mtu; - unsigned int _concurrency; int _fd; int _shutdownSignalPipe[2]; std::atomic_bool _enabled; diff --git a/service/OneService.cpp b/service/OneService.cpp index 0287276bc..169fd6f91 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -46,6 +46,7 @@ #include "../node/SHA512.hpp" #include "../node/Bond.hpp" #include "../node/Peer.hpp" +#include "../node/PacketMultiplexer.hpp" #include "../osdep/Phy.hpp" #include "../osdep/OSUtils.hpp" @@ -986,7 +987,7 @@ public: #if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */ if (rc != 0) { - fprintf(stderr, "failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno)); + fprintf(stderr, "failed to pin rx thread %d to core %d: %s\n", i, pinCore, strerror(errno)); exit(1); } #endif @@ -3131,7 +3132,6 @@ public: n.setTap(EthernetTap::newInstance( nullptr, _homePath.c_str(), - _enableMulticore ? _rxThreadCount : 1, MAC(nwc->mac), nwc->mtu, (unsigned int)ZT_IF_METRIC,