diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp index dc268b1ab..2537c0fb6 100644 --- a/node/IncomingPacket.cpp +++ b/node/IncomingPacket.cpp @@ -38,6 +38,7 @@ #include "Path.hpp" #include "Bond.hpp" #include "Metrics.hpp" +#include "PacketMultiplexer.hpp" namespace ZeroTier { @@ -792,66 +793,65 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar { Metrics::pkt_frame_in++; int32_t _flowId = ZT_QOS_NO_FLOW; - if (peer->flowHashingSupported()) { - if (size() > ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD) { - const unsigned int etherType = at(ZT_PROTO_VERB_FRAME_IDX_ETHERTYPE); - const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; - const uint8_t *const frameData = reinterpret_cast(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; - if (etherType == ZT_ETHERTYPE_IPV4 && (frameLen >= 20)) { - uint16_t srcPort = 0; - uint16_t dstPort = 0; - uint8_t proto = (reinterpret_cast(frameData)[9]); - const unsigned int headerLen = 4 * (reinterpret_cast(frameData)[0] & 0xf); - switch(proto) { - case 0x01: // ICMP - //flowId = 0x01; - break; - // All these start with 16-bit source and destination port in that order - case 0x06: // TCP - case 0x11: // UDP - case 0x84: // SCTP - case 0x88: // UDPLite - if (frameLen > (headerLen + 4)) { - unsigned int pos = headerLen + 0; - srcPort = (reinterpret_cast(frameData)[pos++]) << 8; - srcPort |= (reinterpret_cast(frameData)[pos]); - pos++; - dstPort = (reinterpret_cast(frameData)[pos++]) << 8; - dstPort |= (reinterpret_cast(frameData)[pos]); - _flowId = dstPort ^ srcPort ^ proto; - } - break; - } + if (size() > ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD) { + const unsigned int etherType = at(ZT_PROTO_VERB_FRAME_IDX_ETHERTYPE); + const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; + const uint8_t *const frameData = reinterpret_cast(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; + + if (etherType == ZT_ETHERTYPE_IPV4 && (frameLen >= 20)) { + uint16_t srcPort = 0; + uint16_t dstPort = 0; + uint8_t proto = (reinterpret_cast(frameData)[9]); + const unsigned int headerLen = 4 * (reinterpret_cast(frameData)[0] & 0xf); + switch(proto) { + case 0x01: // ICMP + //flowId = 0x01; + break; + // All these start with 16-bit source and destination port in that order + case 0x06: // TCP + case 0x11: // UDP + case 0x84: // SCTP + case 0x88: // UDPLite + if (frameLen > (headerLen + 4)) { + unsigned int pos = headerLen + 0; + srcPort = (reinterpret_cast(frameData)[pos++]) << 8; + srcPort |= (reinterpret_cast(frameData)[pos]); + pos++; + dstPort = (reinterpret_cast(frameData)[pos++]) << 8; + dstPort |= (reinterpret_cast(frameData)[pos]); + _flowId = dstPort ^ srcPort ^ proto; + } + break; } + } - if (etherType == ZT_ETHERTYPE_IPV6 && (frameLen >= 40)) { - uint16_t srcPort = 0; - uint16_t dstPort = 0; - unsigned int pos; - unsigned int proto; - _ipv6GetPayload((const uint8_t *)frameData, frameLen, pos, proto); - switch(proto) { - case 0x3A: // ICMPv6 - //flowId = 0x3A; - break; - // All these start with 16-bit source and destination port in that order - case 0x06: // TCP - case 0x11: // UDP - case 0x84: // SCTP - case 0x88: // UDPLite - if (frameLen > (pos + 4)) { - srcPort = (reinterpret_cast(frameData)[pos++]) << 8; - srcPort |= (reinterpret_cast(frameData)[pos]); - pos++; - dstPort = (reinterpret_cast(frameData)[pos++]) << 8; - dstPort |= (reinterpret_cast(frameData)[pos]); - _flowId = dstPort ^ srcPort ^ proto; - } - break; - default: - break; - } + if (etherType == ZT_ETHERTYPE_IPV6 && (frameLen >= 40)) { + uint16_t srcPort = 0; + uint16_t dstPort = 0; + unsigned int pos; + unsigned int proto; + _ipv6GetPayload((const uint8_t *)frameData, frameLen, pos, proto); + switch(proto) { + case 0x3A: // ICMPv6 + //flowId = 0x3A; + break; + // All these start with 16-bit source and destination port in that order + case 0x06: // TCP + case 0x11: // UDP + case 0x84: // SCTP + case 0x88: // UDPLite + if (frameLen > (pos + 4)) { + srcPort = (reinterpret_cast(frameData)[pos++]) << 8; + srcPort |= (reinterpret_cast(frameData)[pos]); + pos++; + dstPort = (reinterpret_cast(frameData)[pos++]) << 8; + dstPort |= (reinterpret_cast(frameData)[pos]); + _flowId = dstPort ^ srcPort ^ proto; + } + break; + default: + break; } } } @@ -868,7 +868,7 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; const uint8_t *const frameData = reinterpret_cast(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; if (network->filterIncomingPacket(tPtr,peer,RR->identity.address(),sourceMac,network->mac(),frameData,frameLen,etherType,0) > 0) { - RR->node->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen); + RR->pm->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen, _flowId); } } } else { @@ -941,7 +941,7 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const } // fall through -- 2 means accept regardless of bridging checks or other restrictions case 2: - RR->node->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen); + RR->pm->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen, flowId); break; } } diff --git a/node/Node.cpp b/node/Node.cpp index 5c561fedc..1f377c545 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -35,6 +35,7 @@ #include "Network.hpp" #include "Trace.hpp" #include "Metrics.hpp" +#include "PacketMultiplexer.hpp" // FIXME: remove this suppression and actually fix warnings #ifdef __GNUC__ @@ -119,9 +120,10 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64 const unsigned long mcs = sizeof(Multicaster) + (((sizeof(Multicaster) & 0xf) != 0) ? (16 - (sizeof(Multicaster) & 0xf)) : 0); const unsigned long topologys = sizeof(Topology) + (((sizeof(Topology) & 0xf) != 0) ? (16 - (sizeof(Topology) & 0xf)) : 0); const unsigned long sas = sizeof(SelfAwareness) + (((sizeof(SelfAwareness) & 0xf) != 0) ? (16 - (sizeof(SelfAwareness) & 0xf)) : 0); - const unsigned long bc = sizeof(Bond) + (((sizeof(Bond) & 0xf) != 0) ? (16 - (sizeof(Bond) & 0xf)) : 0); + const unsigned long bcs = sizeof(Bond) + (((sizeof(Bond) & 0xf) != 0) ? (16 - (sizeof(Bond) & 0xf)) : 0); + const unsigned long pms = sizeof(PacketMultiplexer) + (((sizeof(PacketMultiplexer) & 0xf) != 0) ? (16 - (sizeof(PacketMultiplexer) & 0xf)) : 0); - m = reinterpret_cast(::malloc(16 + ts + sws + mcs + topologys + sas + bc)); + m = reinterpret_cast(::malloc(16 + ts + sws + mcs + topologys + sas + bcs + pms)); if (!m) { throw std::bad_alloc(); } @@ -141,6 +143,8 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64 RR->sa = new (m) SelfAwareness(RR); m += sas; RR->bc = new (m) Bond(RR); + m += bcs; + RR->pm = new (m) PacketMultiplexer(RR); } catch ( ... ) { if (RR->sa) { RR->sa->~SelfAwareness(); @@ -160,6 +164,9 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64 if (RR->bc) { RR->bc->~Bond(); } + if (RR->pm) { + RR->pm->~PacketMultiplexer(); + } ::free(m); throw; } @@ -191,6 +198,9 @@ Node::~Node() if (RR->bc) { RR->bc->~Bond(); } + if (RR->pm) { + RR->pm->~PacketMultiplexer(); + } ::free(RR->rtmem); } @@ -230,6 +240,11 @@ ZT_ResultCode Node::processVirtualNetworkFrame( } } +void Node::initMultithreading(unsigned int concurrency, bool cpuPinningEnabled) +{ + RR->pm->setUpPostDecodeReceiveThreads(concurrency, cpuPinningEnabled); +} + // Closure used to ping upstream and active/online peers class _PingPeersThatNeedPing { diff --git a/node/Node.hpp b/node/Node.hpp index 81c78249b..f9d05483a 100644 --- a/node/Node.hpp +++ b/node/Node.hpp @@ -283,7 +283,10 @@ public: return _lowBandwidthMode; } -private: + void initMultithreading(unsigned int concurrency, bool cpuPinningEnabled); + + +public: RuntimeEnvironment _RR; RuntimeEnvironment *RR; void *_uPtr; // _uptr (lower case) is reserved in Visual Studio :P diff --git a/node/PacketMultiplexer.cpp b/node/PacketMultiplexer.cpp new file mode 100644 index 000000000..a1dc835a1 --- /dev/null +++ b/node/PacketMultiplexer.cpp @@ -0,0 +1,122 @@ +/* + * Copyright (c)2013-2021 ZeroTier, Inc. + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file in the project's root directory. + * + * Change Date: 2026-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2.0 of the Apache License. + */ +/****/ + +#include "PacketMultiplexer.hpp" + +#include "Node.hpp" +#include "RuntimeEnvironment.hpp" +#include "Constants.hpp" + +#include +#include + +namespace ZeroTier { + +PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv) +{ + RR = renv; +}; + +void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId) +{ +#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__) + RR->node->putFrame(tPtr,nwid,nuptr,source,dest,etherType,vlanId,(const void *)data,len); + return; +#endif + + if (!_enabled) { + RR->node->putFrame(tPtr,nwid,nuptr,source,dest,etherType,vlanId,(const void *)data,len); + return; + } + + PacketRecord* packet; + _rxPacketVector_m.lock(); + if (_rxPacketVector.empty()) { + packet = new PacketRecord; + } + else { + packet = _rxPacketVector.back(); + _rxPacketVector.pop_back(); + } + _rxPacketVector_m.unlock(); + + packet->tPtr = tPtr; + packet->nwid = nwid; + packet->nuptr = nuptr; + packet->source = source.toInt(); + packet->dest = dest.toInt(); + packet->etherType = etherType; + packet->vlanId = vlanId; + packet->len = len; + packet->flowId = flowId; + memcpy(packet->data, data, len); + + int bucket = flowId % _concurrency; + _rxPacketQueues[bucket]->postLimit(packet, 2048); +} + +void PacketMultiplexer::setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled) +{ +#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__) + return; +#endif + _enabled = true; + _concurrency = concurrency; + bool _enablePinning = cpuPinningEnabled; + + for (unsigned int i = 0; i < _concurrency; ++i) { + fprintf(stderr, "Reserved queue for thread %d\n", i); + _rxPacketQueues.push_back(new BlockingQueue()); + } + + // Each thread picks from its own queue to feed into the core + for (unsigned int i = 0; i < _concurrency; ++i) { + _rxThreads.push_back(std::thread([this, i, _enablePinning]() { + fprintf(stderr, "Created post-decode packet ingestion thread %d\n", i); + + PacketRecord* packet = nullptr; + for (;;) { + if (! _rxPacketQueues[i]->get(packet)) { + break; + } + if (! packet) { + break; + } + + // fprintf(stderr, "popped packet from queue %d\n", i); + + MAC sourceMac = MAC(packet->source); + MAC destMac = MAC(packet->dest); + + RR->node->putFrame(packet->tPtr, packet->nwid, packet->nuptr, sourceMac, destMac, packet->etherType, 0, (const void*)packet->data, packet->len); + { + Mutex::Lock l(_rxPacketVector_m); + _rxPacketVector.push_back(packet); + } + /* + if (ZT_ResultCode_isFatal(err)) { + char tmp[256]; + OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err); + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = tmp; + this->terminate(); + break; + } + */ + } + })); + } +} + +} // namespace ZeroTier \ No newline at end of file diff --git a/node/PacketMultiplexer.hpp b/node/PacketMultiplexer.hpp new file mode 100644 index 000000000..4753180ed --- /dev/null +++ b/node/PacketMultiplexer.hpp @@ -0,0 +1,65 @@ +/* + * Copyright (c)2013-2021 ZeroTier, Inc. + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file in the project's root directory. + * + * Change Date: 2026-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2.0 of the Apache License. + */ +/****/ + +#ifndef ZT_PACKET_MULTIPLEXER_HPP +#define ZT_PACKET_MULTIPLEXER_HPP + +#include "../osdep/BlockingQueue.hpp" +#include "MAC.hpp" +#include "Mutex.hpp" +#include "RuntimeEnvironment.hpp" + +#include +#include + +namespace ZeroTier { + +struct PacketRecord { + void* tPtr; + uint64_t nwid; + void** nuptr; + uint64_t source; + uint64_t dest; + unsigned int etherType; + unsigned int vlanId; + uint8_t data[ZT_MAX_MTU]; + unsigned int len; + unsigned int flowId; +}; + +class PacketMultiplexer { + public: + const RuntimeEnvironment* RR; + + PacketMultiplexer(const RuntimeEnvironment* renv); + + void setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled); + + void putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId); + + std::vector*> _rxPacketQueues; + + unsigned int _concurrency; + // pool + std::vector _rxPacketVector; + std::vector _rxPacketThreads; + Mutex _rxPacketVector_m, _rxPacketThreads_m; + + std::vector _rxThreads; + unsigned int _rxThreadCount; + bool _enabled; +}; + +} // namespace ZeroTier + +#endif // ZT_PACKET_MULTIPLEXER_HPP \ No newline at end of file diff --git a/node/RuntimeEnvironment.hpp b/node/RuntimeEnvironment.hpp index eefa2eeda..6b14f771c 100644 --- a/node/RuntimeEnvironment.hpp +++ b/node/RuntimeEnvironment.hpp @@ -31,6 +31,7 @@ class NetworkController; class SelfAwareness; class Trace; class Bond; +class PacketMultiplexer; /** * Holds global state for an instance of ZeroTier::Node @@ -77,6 +78,7 @@ public: Topology *topology; SelfAwareness *sa; Bond *bc; + PacketMultiplexer *pm; // This node's identity and string representations thereof Identity identity; diff --git a/node/Switch.cpp b/node/Switch.cpp index 4fea0d0e9..7664f7a48 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -519,7 +519,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr &network,const RR->node->putFrame(tPtr, network->id(), network->userPtr(), peerMac, from, ZT_ETHERTYPE_IPV6, 0, adv, 72); }).detach(); - + return; // NDP emulation done. We have forged a "fake" reply, so no need to send actual NDP query. } // else no NDP emulation } // else no NDP emulation diff --git a/objects.mk b/objects.mk index d07578fb3..1d8a6c0a5 100644 --- a/objects.mk +++ b/objects.mk @@ -29,7 +29,8 @@ CORE_OBJS=\ node/Topology.o \ node/Trace.o \ node/Utils.o \ - node/Bond.o + node/Bond.o \ + node/PacketMultiplexer.o ONE_OBJS=\ controller/EmbeddedNetworkController.o \ diff --git a/osdep/BSDEthernetTap.cpp b/osdep/BSDEthernetTap.cpp index 3114306ee..1a240c1a1 100644 --- a/osdep/BSDEthernetTap.cpp +++ b/osdep/BSDEthernetTap.cpp @@ -39,7 +39,9 @@ #include #include #include +#include +#include #include #include #include @@ -53,6 +55,7 @@ #include "BSDEthernetTap.hpp" #define ZT_BASE32_CHARS "0123456789abcdefghijklmnopqrstuv" +#define ZT_TAP_BUF_SIZE (1024 * 16) // ff:ff:ff:ff:ff:ff with no ADI static const ZeroTier::MulticastGroup _blindWildcardMulticastGroup(ZeroTier::MAC(0xff),0); @@ -61,6 +64,8 @@ namespace ZeroTier { BSDEthernetTap::BSDEthernetTap( const char *homePath, + unsigned int concurrency, + bool pinning, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -69,6 +74,8 @@ BSDEthernetTap::BSDEthernetTap( void (*handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int), void *arg) : _handler(handler), + _concurrency(concurrency), + _pinning(pinning), _arg(arg), _nwid(nwid), _mtu(mtu), @@ -195,11 +202,9 @@ BSDEthernetTap::BSDEthernetTap( BSDEthernetTap::~BSDEthernetTap() { ::write(_shutdownSignalPipe[1],"\0",1); // causes thread to exit - Thread::join(_thread); ::close(_fd); ::close(_shutdownSignalPipe[0]); ::close(_shutdownSignalPipe[1]); - long cpid = (long)vfork(); if (cpid == 0) { #ifdef ZT_TRACE @@ -211,6 +216,10 @@ BSDEthernetTap::~BSDEthernetTap() int exitcode = -1; ::waitpid(cpid,&exitcode,0); } + Thread::join(_thread); + for (std::thread &t : _rxThreads) { + t.join(); + } } void BSDEthernetTap::setEnabled(bool en) @@ -418,53 +427,75 @@ void BSDEthernetTap::setMtu(unsigned int mtu) void BSDEthernetTap::threadMain() throw() { - fd_set readfds,nullfds; - MAC to,from; - int n,nfds,r; - char getBuf[ZT_MAX_MTU + 64]; - // Wait for a moment after startup -- wait for Network to finish // constructing itself. Thread::sleep(500); - FD_ZERO(&readfds); - FD_ZERO(&nullfds); - nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1; + for (unsigned int i = 0; i < _concurrency; ++i) { + _rxThreads.push_back(std::thread([this, i, _pinning] { - r = 0; - for(;;) { - FD_SET(_shutdownSignalPipe[0],&readfds); - FD_SET(_fd,&readfds); - select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0); - - if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) // writes to shutdown pipe terminate thread - break; - - if (FD_ISSET(_fd,&readfds)) { - n = (int)::read(_fd,getBuf + r,sizeof(getBuf) - r); - if (n < 0) { - if ((errno != EINTR)&&(errno != ETIMEDOUT)) - break; - } else { - // Some tap drivers like to send the ethernet frame and the - // payload in two chunks, so handle that by accumulating - // data until we have at least a frame. - r += n; - if (r > 14) { - if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms - r = _mtu + 14; - - if (_enabled) { - to.setTo(getBuf,6); - from.setTo(getBuf + 6,6); - unsigned int etherType = ntohs(((const uint16_t *)getBuf)[6]); - _handler(_arg,(void *)0,_nwid,from,to,etherType,0,(const void *)(getBuf + 14),r - 14); - } - - r = 0; + if (_pinning) { + int pinCore = i % _concurrency; + fprintf(stderr, "Pinning thread %d to core %d\n", i, pinCore); + pthread_t self = pthread_self(); + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(pinCore, &cpuset); + //int rc = sched_setaffinity(self, sizeof(cpu_set_t), &cpuset); + int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); + if (rc != 0) + { + fprintf(stderr, "Failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno)); + exit(1); } } - } + + uint8_t b[ZT_TAP_BUF_SIZE]; + MAC to, from; + fd_set readfds, nullfds; + int n, nfds, r; + + FD_ZERO(&readfds); + FD_ZERO(&nullfds); + nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1; + + r = 0; + + for(;;) { + FD_SET(_shutdownSignalPipe[0],&readfds); + FD_SET(_fd,&readfds); + select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0); + + if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) // writes to shutdown pipe terminate thread + break; + + if (FD_ISSET(_fd,&readfds)) { + n = (int)::read(_fd,b + r,sizeof(b) - r); + if (n < 0) { + if ((errno != EINTR)&&(errno != ETIMEDOUT)) + break; + } else { + // Some tap drivers like to send the ethernet frame and the + // payload in two chunks, so handle that by accumulating + // data until we have at least a frame. + r += n; + if (r > 14) { + if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms + r = _mtu + 14; + + if (_enabled) { + to.setTo(b,6); + from.setTo(b + 6,6); + unsigned int etherType = ntohs(((const uint16_t *)b)[6]); + _handler(_arg,(void *)0,_nwid,from,to,etherType,0,(const void *)(b + 14),r - 14); + } + + r = 0; + } + } + } + } + })); } } diff --git a/osdep/BSDEthernetTap.hpp b/osdep/BSDEthernetTap.hpp index 9700fb365..50e2e6e8b 100644 --- a/osdep/BSDEthernetTap.hpp +++ b/osdep/BSDEthernetTap.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include "../node/Constants.hpp" #include "../node/MulticastGroup.hpp" @@ -34,6 +35,8 @@ class BSDEthernetTap : public EthernetTap public: BSDEthernetTap( const char *homePath, + unsigned int concurrency, + bool pinning, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -62,6 +65,8 @@ public: private: void (*_handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int); void *_arg; + unsigned int _concurrency; + bool _pinning; uint64_t _nwid; Thread _thread; std::string _dev; @@ -73,6 +78,7 @@ private: volatile bool _enabled; mutable std::vector _ifaddrs; mutable uint64_t _lastIfAddrsUpdate; + std::vector _rxThreads; }; } // namespace ZeroTier diff --git a/osdep/EthernetTap.cpp b/osdep/EthernetTap.cpp index fdda8fe15..0be209ecd 100644 --- a/osdep/EthernetTap.cpp +++ b/osdep/EthernetTap.cpp @@ -57,6 +57,8 @@ namespace ZeroTier { std::shared_ptr EthernetTap::newInstance( const char *tapDeviceType, // OS-specific, NULL for default + unsigned int concurrency, + bool pinning, const char *homePath, const MAC &mac, unsigned int mtu, @@ -92,7 +94,7 @@ std::shared_ptr EthernetTap::newInstance( #endif // __APPLE__ #ifdef __LINUX__ - return std::shared_ptr(new LinuxEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new LinuxEthernetTap(homePath,concurrency,pinning,mac,mtu,metric,nwid,friendlyName,handler,arg)); #endif // __LINUX__ #ifdef __WINDOWS__ @@ -130,7 +132,7 @@ std::shared_ptr EthernetTap::newInstance( #endif // __WINDOWS__ #ifdef __FreeBSD__ - return std::shared_ptr(new BSDEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg)); + return std::shared_ptr(new BSDEthernetTap(homePath,concurrency,pinning,mac,mtu,metric,nwid,friendlyName,handler,arg)); #endif // __FreeBSD__ #ifdef __NetBSD__ diff --git a/osdep/EthernetTap.hpp b/osdep/EthernetTap.hpp index e6833c33d..1d97f1256 100644 --- a/osdep/EthernetTap.hpp +++ b/osdep/EthernetTap.hpp @@ -32,6 +32,8 @@ class EthernetTap public: static std::shared_ptr newInstance( const char *tapDeviceType, // OS-specific, NULL for default + unsigned int concurrency, + bool pinning, const char *homePath, const MAC &mac, unsigned int mtu, diff --git a/osdep/LinuxEthernetTap.cpp b/osdep/LinuxEthernetTap.cpp index 11dd8c0cf..14929d176 100644 --- a/osdep/LinuxEthernetTap.cpp +++ b/osdep/LinuxEthernetTap.cpp @@ -60,7 +60,7 @@ #define IFNAMSIZ 16 #endif -#define ZT_TAP_BUF_SIZE 16384 +#define ZT_TAP_BUF_SIZE (1024 * 16) // ff:ff:ff:ff:ff:ff with no ADI static const ZeroTier::MulticastGroup _blindWildcardMulticastGroup(ZeroTier::MAC(0xff),0); @@ -68,7 +68,7 @@ static const ZeroTier::MulticastGroup _blindWildcardMulticastGroup(ZeroTier::MAC namespace ZeroTier { // determine if we're running a really old linux kernel. -// Kernels in the 2.6.x series don't behave the same when bringing up +// Kernels in the 2.6.x series don't behave the same when bringing up // the tap devices. // // Returns true if the kernel major version is < 3 @@ -111,6 +111,8 @@ static void _base32_5_to_8(const uint8_t *in,char *out) LinuxEthernetTap::LinuxEthernetTap( const char *homePath, + unsigned int concurrency, + bool pinning, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -220,135 +222,155 @@ LinuxEthernetTap::LinuxEthernetTap( (void)::pipe(_shutdownSignalPipe); - _tapReaderThread = std::thread([this]{ - uint8_t b[ZT_TAP_BUF_SIZE]; - fd_set readfds,nullfds; - int n,nfds,r; - std::vector buffers; - struct ifreq ifr; + for (unsigned int i = 0; i < concurrency; ++i) { + _rxThreads.push_back(std::thread([this, i, concurrency, pinning] { - memset(&ifr,0,sizeof(ifr)); - strcpy(ifr.ifr_name,_dev.c_str()); + if (pinning) { + int pinCore = i % concurrency; + fprintf(stderr, "Pinning tap thread %d to core %d\n", i, pinCore); + pthread_t self = pthread_self(); + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(pinCore, &cpuset); + int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); + if (rc != 0) + { + fprintf(stderr, "Failed to pin tap thread %d to core %d: %s\n", i, pinCore, strerror(errno)); + exit(1); + } + } - const int sock = socket(AF_INET,SOCK_DGRAM,0); - if (sock <= 0) - return; + uint8_t b[ZT_TAP_BUF_SIZE]; + fd_set readfds, nullfds; + int n, nfds, r; + if (i == 0) { + struct ifreq ifr; + memset(&ifr, 0, sizeof(ifr)); + strcpy(ifr.ifr_name, _dev.c_str()); - if (ioctl(sock,SIOCGIFFLAGS,(void *)&ifr) < 0) { - ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n"); - return; - } + const int sock = socket(AF_INET, SOCK_DGRAM, 0); + if (sock <= 0) + return; - ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER; - _mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data,6); - if (ioctl(sock,SIOCSIFHWADDR,(void *)&ifr) < 0) { - ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n"); - return; - } + if (ioctl(sock, SIOCGIFFLAGS, (void*)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n"); + return; + } - usleep(100000); + ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER; + _mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data, 6); + if (ioctl(sock, SIOCSIFHWADDR, (void*)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n"); + return; + } + + usleep(100000); + + if (isOldLinuxKernel()) { + ifr.ifr_ifru.ifru_mtu = (int)_mtu; + if (ioctl(sock, SIOCSIFMTU, (void*)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n"); + return; + } + + usleep(100000); + } + + ifr.ifr_flags |= IFF_MULTICAST; + ifr.ifr_flags |= IFF_UP; + if (ioctl(sock, SIOCSIFFLAGS, (void*)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n"); + return; + } + + usleep(100000); + + if (! isOldLinuxKernel()) { + ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER; + _mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data, 6); + if (ioctl(sock, SIOCSIFHWADDR, (void*)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n"); + return; + } + + ifr.ifr_ifru.ifru_mtu = (int)_mtu; + if (ioctl(sock, SIOCSIFMTU, (void*)&ifr) < 0) { + ::close(sock); + printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n"); + return; + } + } + + fcntl(_fd, F_SETFL, O_NONBLOCK); - if (isOldLinuxKernel()) { - ifr.ifr_ifru.ifru_mtu = (int)_mtu; - if (ioctl(sock,SIOCSIFMTU,(void *)&ifr) < 0) { ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n"); + } + + if (! _run) { return; } - usleep(100000); - } - + FD_ZERO(&readfds); + FD_ZERO(&nullfds); + nfds = (int)std::max(_shutdownSignalPipe[0], _fd) + 1; - ifr.ifr_flags |= IFF_MULTICAST; - ifr.ifr_flags |= IFF_UP; - if (ioctl(sock,SIOCSIFFLAGS,(void *)&ifr) < 0) { - ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n"); - return; - } + r = 0; + for (;;) { + FD_SET(_shutdownSignalPipe[0], &readfds); + FD_SET(_fd, &readfds); + select(nfds, &readfds, &nullfds, &nullfds, (struct timeval*)0); - usleep(100000); + if (FD_ISSET(_shutdownSignalPipe[0], &readfds)) { + break; + } + if (FD_ISSET(_fd, &readfds)) { + for (;;) { + // read until there are no more packets, then return to outer select() loop + n = (int)::read(_fd, b + r, ZT_TAP_BUF_SIZE - r); + if (n > 0) { + // Some tap drivers like to send the ethernet frame and the + // payload in two chunks, so handle that by accumulating + // data until we have at least a frame. + r += n; + if (r > 14) { + if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms + r = _mtu + 14; - if (!isOldLinuxKernel()) { - ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER; - _mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data,6); - if (ioctl(sock,SIOCSIFHWADDR,(void *)&ifr) < 0) { - ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n"); - return; - } + if (_enabled) { + MAC to(b, 6), from(b + 6, 6); + unsigned int etherType = Utils::ntoh(((const uint16_t*)b)[6]); + _handler(_arg, nullptr, _nwid, from, to, etherType, 0, (const void*)(b + 14), (unsigned int)(r - 14)); + } - ifr.ifr_ifru.ifru_mtu = (int)_mtu; - if (ioctl(sock,SIOCSIFMTU,(void *)&ifr) < 0) { - ::close(sock); - printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n"); - return; - } - } - - fcntl(_fd,F_SETFL,O_NONBLOCK); - - ::close(sock); - - if (!_run) - return; - - FD_ZERO(&readfds); - FD_ZERO(&nullfds); - nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1; - - r = 0; - for(;;) { - FD_SET(_shutdownSignalPipe[0],&readfds); - FD_SET(_fd,&readfds); - select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0); - - if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) - break; - - if (FD_ISSET(_fd,&readfds)) { - for(;;) { // read until there are no more packets, then return to outer select() loop - n = (int)::read(_fd,b + r,ZT_TAP_BUF_SIZE - r); - if (n > 0) { - // Some tap drivers like to send the ethernet frame and the - // payload in two chunks, so handle that by accumulating - // data until we have at least a frame. - r += n; - if (r > 14) { - if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms - r = _mtu + 14; - - if (_enabled) { - //_tapq.post(std::pair(buf,r)); - //buf = nullptr; - MAC to(b, 6),from(b + 6, 6); - unsigned int etherType = Utils::ntoh(((const uint16_t *)b)[6]); - _handler(_arg, nullptr, _nwid, from, to, etherType, 0, (const void *)(b + 14),(unsigned int)(r - 14)); + r = 0; } - - r = 0; } - } else { - r = 0; - break; + else { + r = 0; + break; + } } } } - } - }); + })); + } } LinuxEthernetTap::~LinuxEthernetTap() { _run = false; (void)::write(_shutdownSignalPipe[1],"\0",1); - _tapReaderThread.join(); ::close(_fd); ::close(_shutdownSignalPipe[0]); ::close(_shutdownSignalPipe[1]); + for (std::thread &t : _rxThreads) { + t.join(); + } } void LinuxEthernetTap::setEnabled(bool en) diff --git a/osdep/LinuxEthernetTap.hpp b/osdep/LinuxEthernetTap.hpp index 6353f8661..41e299823 100644 --- a/osdep/LinuxEthernetTap.hpp +++ b/osdep/LinuxEthernetTap.hpp @@ -26,6 +26,7 @@ #include #include "../node/MulticastGroup.hpp" #include "EthernetTap.hpp" +#include "BlockingQueue.hpp" namespace ZeroTier { @@ -34,6 +35,8 @@ class LinuxEthernetTap : public EthernetTap public: LinuxEthernetTap( const char *homePath, + unsigned int concurrency, + bool pinning, const MAC &mac, unsigned int mtu, unsigned int metric, @@ -57,9 +60,6 @@ public: virtual void setMtu(unsigned int mtu); virtual void setDns(const char *domain, const std::vector &servers) {} - - - private: void (*_handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int); void *_arg; @@ -73,9 +73,9 @@ private: int _shutdownSignalPipe[2]; std::atomic_bool _enabled; std::atomic_bool _run; - std::thread _tapReaderThread; mutable std::vector _ifaddrs; mutable uint64_t _lastIfAddrsUpdate; + std::vector _rxThreads; }; } // namespace ZeroTier diff --git a/osdep/MacEthernetTapAgent.c b/osdep/MacEthernetTapAgent.c index 0b2fcb85b..e74c66940 100644 --- a/osdep/MacEthernetTapAgent.c +++ b/osdep/MacEthernetTapAgent.c @@ -32,7 +32,7 @@ * All this stuff is basically undocumented. A lot of tracing through * the Darwin/XNU kernel source was required to figure out how to make * this actually work. - * + * * We hope to develop a DriverKit-based driver in the near-mid future to * replace this weird hack, but it works for now through Big Sur in our * testing. diff --git a/osdep/MacKextEthernetTap.cpp b/osdep/MacKextEthernetTap.cpp index d52d67947..e06560300 100644 --- a/osdep/MacKextEthernetTap.cpp +++ b/osdep/MacKextEthernetTap.cpp @@ -447,7 +447,9 @@ MacKextEthernetTap::~MacKextEthernetTap() ::write(_shutdownSignalPipe[1],"\0",1); // causes thread to exit Thread::join(_thread); - + for (std::thread &t : _rxThreads) { + t.join(); + } ::close(_fd); ::close(_shutdownSignalPipe[0]); ::close(_shutdownSignalPipe[1]); diff --git a/osdep/MacKextEthernetTap.hpp b/osdep/MacKextEthernetTap.hpp index 46b0915f1..aede92867 100644 --- a/osdep/MacKextEthernetTap.hpp +++ b/osdep/MacKextEthernetTap.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include "../node/Constants.hpp" #include "../node/MAC.hpp" @@ -75,6 +76,7 @@ private: int _fd; int _shutdownSignalPipe[2]; volatile bool _enabled; + std::vector _rxThreads; }; } // namespace ZeroTier diff --git a/service/OneService.cpp b/service/OneService.cpp index 4a4962b78..298b2b26e 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -16,7 +16,6 @@ #include #include #include - #include #include #include @@ -26,6 +25,11 @@ #include #include +#ifdef __FreeBSD__ +#include +#include +#endif + #include "../version.h" #include "../include/ZeroTierOne.h" @@ -42,6 +46,7 @@ #include "../node/SHA512.hpp" #include "../node/Bond.hpp" #include "../node/Peer.hpp" +#include "../node/PacketMultiplexer.hpp" #include "../osdep/Phy.hpp" #include "../osdep/OSUtils.hpp" @@ -759,7 +764,7 @@ struct TcpConnection Mutex writeq_m; }; -struct OneServiceIncomingPacket +struct PacketRecord { uint64_t now; int64_t sock; @@ -786,14 +791,22 @@ public: SoftwareUpdater *_updater; bool _updateAutoApply; - httplib::Server _controlPlane; + httplib::Server _controlPlane; httplib::Server _controlPlaneV6; - std::thread _serverThread; + std::thread _serverThread; std::thread _serverThreadV6; bool _serverThreadRunning; bool _serverThreadRunningV6; - bool _allowTcpFallbackRelay; + BlockingQueue _rxPacketQueue; + std::vector _rxPacketVector; + std::vector _rxPacketThreads; + Mutex _rxPacketVector_m,_rxPacketThreads_m; + bool _multicoreEnabled; + bool _cpuPinningEnabled; + unsigned int _concurrency; + + bool _allowTcpFallbackRelay; bool _forceTcpRelay; bool _allowSecondaryPort; bool _enableWebServer; @@ -844,8 +857,6 @@ public: // Deadline for the next background task service function volatile int64_t _nextBackgroundTaskDeadline; - - std::map _nets; Mutex _nets_m; @@ -892,9 +903,9 @@ public: ,_node((Node *)0) ,_updater((SoftwareUpdater *)0) ,_updateAutoApply(false) - ,_controlPlane() + ,_controlPlane() ,_controlPlaneV6() - ,_serverThread() + ,_serverThread() ,_serverThreadV6() ,_serverThreadRunning(false) ,_serverThreadRunningV6(false) @@ -928,9 +939,9 @@ public: _ports[1] = 0; _ports[2] = 0; - prometheus::simpleapi::saver.set_registry(prometheus::simpleapi::registry_ptr); - prometheus::simpleapi::saver.set_delay(std::chrono::seconds(5)); - prometheus::simpleapi::saver.set_out_file(_homePath + ZT_PATH_SEPARATOR + "metrics.prom"); + prometheus::simpleapi::saver.set_registry(prometheus::simpleapi::registry_ptr); + prometheus::simpleapi::saver.set_delay(std::chrono::seconds(5)); + prometheus::simpleapi::saver.set_out_file(_homePath + ZT_PATH_SEPARATOR + "metrics.prom"); #if ZT_VAULT_SUPPORT curl_global_init(CURL_GLOBAL_DEFAULT); @@ -942,20 +953,34 @@ public: #ifdef __WINDOWS__ WinFWHelper::removeICMPRules(); #endif + + _rxPacketQueue.stop(); + _rxPacketThreads_m.lock(); + for(auto t=_rxPacketThreads.begin();t!=_rxPacketThreads.end();++t) { + t->join(); + } + _rxPacketThreads_m.unlock(); _binder.closeAll(_phy); #if ZT_VAULT_SUPPORT curl_global_cleanup(); #endif - _controlPlane.stop(); + _controlPlane.stop(); if (_serverThreadRunning) { - _serverThread.join(); + _serverThread.join(); } _controlPlaneV6.stop(); if (_serverThreadRunningV6) { _serverThreadV6.join(); } + _rxPacketVector_m.lock(); + while (!_rxPacketVector.empty()) { + delete _rxPacketVector.back(); + _rxPacketVector.pop_back(); + } + _rxPacketVector_m.unlock(); + #ifdef ZT_USE_MINIUPNPC delete _portMapper; @@ -964,6 +989,15 @@ public: delete _rc; } + void setUpMultithreading() + { +#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__) + return; +#endif + _node->initMultithreading(_concurrency, _cpuPinningEnabled); + bool pinning = _cpuPinningEnabled; + } + virtual ReasonForTermination run() { try { @@ -1272,6 +1306,9 @@ public: const unsigned long delay = (dl > now) ? (unsigned long)(dl - now) : 500; clockShouldBe = now + (int64_t)delay; _phy.poll(delay); + + + } } catch (std::exception &e) { Mutex::Lock _l(_termReason_m); @@ -2562,7 +2599,25 @@ public: fprintf(stderr,"WARNING: using manually-specified secondary and/or tertiary ports. This can cause NAT issues." ZT_EOL_S); } _portMappingEnabled = OSUtils::jsonBool(settings["portMappingEnabled"],true); - _node->setLowBandwidthMode(OSUtils::jsonBool(settings["lowBandwidthMode"],false)); +#if defined(__LINUX__) || defined(__FreeBSD__) + _multicoreEnabled = OSUtils::jsonBool(settings["multicoreEnabled"],false); + _concurrency = OSUtils::jsonInt(settings["concurrency"],1); + _cpuPinningEnabled = OSUtils::jsonBool(settings["cpuPinningEnabled"],false); + if (_multicoreEnabled) { + unsigned int maxConcurrency = std::thread::hardware_concurrency(); + if (_concurrency <= 1 || _concurrency >= maxConcurrency) { + unsigned int conservativeDefault = (std::thread::hardware_concurrency() >= 4 ? 2 : 1); + fprintf(stderr, "Concurrency level provided (%d) is invalid, assigning conservative default value of (%d)\n", _concurrency, conservativeDefault); + _concurrency = conservativeDefault; + } + setUpMultithreading(); + } + else { + // Force values in case the user accidentally defined them with multicore disabled + _concurrency = 1; + _cpuPinningEnabled = false; + } +#endif #ifndef ZT_SDK const std::string up(OSUtils::jsonString(settings["softwareUpdate"],ZT_SOFTWARE_UPDATE_DEFAULT)); @@ -2877,16 +2932,19 @@ public: // Handlers for Node and Phy<> callbacks // ========================================================================= - inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *localAddr,const struct sockaddr *from,void *data,unsigned long len) + + + + inline void phyOnDatagram(PhySocket* sock, void** uptr, const struct sockaddr* localAddr, const struct sockaddr* from, void* data, unsigned long len) { if (_forceTcpRelay) { return; } - Metrics::udp_recv += len; + Metrics::udp_recv += len; const uint64_t now = OSUtils::now(); - if ((len >= 16)&&(reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) { + if ((len >= 16) && (reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) { _lastDirectReceiveFromGlobal = now; - } + } const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast(sock),reinterpret_cast(from),data,len,&_nextBackgroundTaskDeadline); if (ZT_ResultCode_isFatal(rc)) { char tmp[256]; @@ -2898,6 +2956,7 @@ public: } } + inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) { if (!success) { @@ -3116,6 +3175,8 @@ public: n.setTap(EthernetTap::newInstance( nullptr, + _concurrency, + _cpuPinningEnabled, _homePath.c_str(), MAC(nwc->mac), nwc->mtu, @@ -3630,8 +3691,9 @@ public: inline void nodeVirtualNetworkFrameFunction(uint64_t nwid,void **nuptr,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) { NetworkState *n = reinterpret_cast(*nuptr); - if ((!n)||(!n->tap())) + if ((!n)||(!n->tap())) { return; + } n->tap()->put(MAC(sourceMac),MAC(destMac),etherType,data,len); } diff --git a/windows/ZeroTierOne/ZeroTierOne.vcxproj b/windows/ZeroTierOne/ZeroTierOne.vcxproj index fcd8b56e2..2dd05aa57 100644 --- a/windows/ZeroTierOne/ZeroTierOne.vcxproj +++ b/windows/ZeroTierOne/ZeroTierOne.vcxproj @@ -88,6 +88,7 @@ +