From 8f5cc4ed333905fd15b62418a64815fe8a3f4068 Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Fri, 6 Sep 2024 15:19:06 -0700 Subject: [PATCH] Completely remove vestigial RX code --- service/OneService.cpp | 97 ++++-------------------------------------- 1 file changed, 8 insertions(+), 89 deletions(-) diff --git a/service/OneService.cpp b/service/OneService.cpp index 1544b8150..b607f014a 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -996,60 +996,6 @@ public: #endif _node->initMultithreading(_concurrency, _cpuPinningEnabled); bool pinning = _cpuPinningEnabled; - -/* - fprintf(stderr, "Starting %d RX threads\n", _concurrency); - for (unsigned int i = 0; i < _concurrency; ++i) { - _rxPacketThreads.push_back(std::thread([this, i, pinning]() { - - if (pinning) { -#if defined(__LINUX__) || defined(__FreeBSD__) // || defined(__APPLE__) - int pinCore = i % _concurrency; - fprintf(stderr, "CPU Pinning enabled. Pinning thread %d to core %d\n", i, pinCore); - pthread_t self = pthread_self(); - cpu_set_t cpuset; - CPU_ZERO(&cpuset); - CPU_SET(pinCore, &cpuset); -#endif -#ifdef __LINUX__ - int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); -#elif __FreeBSD__ - int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); -#endif -#if defined(__LINUX__) || defined(__FreeBSD__) // || defined(__APPLE__) - if (rc != 0) - { - fprintf(stderr, "failed to pin rx thread %d to core %d: %s\n", i, pinCore, strerror(errno)); - exit(1); - } -#endif - } - PacketRecord* packet = nullptr; - for (;;) { - if (! _rxPacketQueue.get(packet)) { - break; - } - if (! packet) { - break; - } - const ZT_ResultCode err = _node->processWirePacket(nullptr, packet->now, packet->sock, &(packet->from), packet->data, packet->size, &_nextBackgroundTaskDeadline); - { - Mutex::Lock l(_rxPacketVector_m); - _rxPacketVector.push_back(packet); - } - if (ZT_ResultCode_isFatal(err)) { - char tmp[256]; - OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err); - Mutex::Lock _l(_termReason_m); - _termReason = ONE_UNRECOVERABLE_ERROR; - _fatalErrorMessage = tmp; - this->terminate(); - break; - } - } - })); - } - */ } virtual ReasonForTermination run() @@ -2999,42 +2945,15 @@ public: if ((len >= 16) && (reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) { _lastDirectReceiveFromGlobal = now; } - /* -#if defined(__LINUX__) || defined(__FreeBSD__) - if (_multicoreEnabled) { - PacketRecord* packet; - _rxPacketVector_m.lock(); - if (_rxPacketVector.empty()) { - packet = new PacketRecord; - } - else { - packet = _rxPacketVector.back(); - _rxPacketVector.pop_back(); - } - _rxPacketVector_m.unlock(); - - packet->sock = reinterpret_cast(sock); - packet->now = now; - memcpy(&(packet->from), from, sizeof(struct sockaddr_storage)); - packet->size = (unsigned int)len; - memcpy(packet->data, data, len); - _rxPacketQueue.postLimit(packet, 2048 * _concurrency); + const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast(sock),reinterpret_cast(from),data,len,&_nextBackgroundTaskDeadline); + if (ZT_ResultCode_isFatal(rc)) { + char tmp[256]; + OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = tmp; + this->terminate(); } - else { -#endif -*/ - const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast(sock),reinterpret_cast(from),data,len,&_nextBackgroundTaskDeadline); - if (ZT_ResultCode_isFatal(rc)) { - char tmp[256]; - OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); - Mutex::Lock _l(_termReason_m); - _termReason = ONE_UNRECOVERABLE_ERROR; - _fatalErrorMessage = tmp; - this->terminate(); - } -//#if defined(__LINUX__) || defined(__FreeBSD__) -// } -//#endif }