diff --git a/node/PacketMultiplexer.cpp b/node/PacketMultiplexer.cpp index 9035ccc0c..a1dc835a1 100644 --- a/node/PacketMultiplexer.cpp +++ b/node/PacketMultiplexer.cpp @@ -62,7 +62,7 @@ void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const memcpy(packet->data, data, len); int bucket = flowId % _concurrency; - _rxPacketQueues[bucket]->postLimit(packet, 256); + _rxPacketQueues[bucket]->postLimit(packet, 2048); } void PacketMultiplexer::setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled) diff --git a/service/OneService.cpp b/service/OneService.cpp index ff79c6f85..1544b8150 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -997,12 +997,13 @@ public: _node->initMultithreading(_concurrency, _cpuPinningEnabled); bool pinning = _cpuPinningEnabled; +/* fprintf(stderr, "Starting %d RX threads\n", _concurrency); for (unsigned int i = 0; i < _concurrency; ++i) { _rxPacketThreads.push_back(std::thread([this, i, pinning]() { if (pinning) { -#if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */ +#if defined(__LINUX__) || defined(__FreeBSD__) // || defined(__APPLE__) int pinCore = i % _concurrency; fprintf(stderr, "CPU Pinning enabled. Pinning thread %d to core %d\n", i, pinCore); pthread_t self = pthread_self(); @@ -1015,7 +1016,7 @@ public: #elif __FreeBSD__ int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset); #endif -#if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */ +#if defined(__LINUX__) || defined(__FreeBSD__) // || defined(__APPLE__) if (rc != 0) { fprintf(stderr, "failed to pin rx thread %d to core %d: %s\n", i, pinCore, strerror(errno)); @@ -1048,6 +1049,7 @@ public: } })); } + */ } virtual ReasonForTermination run() @@ -2997,6 +2999,7 @@ public: if ((len >= 16) && (reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) { _lastDirectReceiveFromGlobal = now; } + /* #if defined(__LINUX__) || defined(__FreeBSD__) if (_multicoreEnabled) { PacketRecord* packet; @@ -3015,10 +3018,11 @@ public: memcpy(&(packet->from), from, sizeof(struct sockaddr_storage)); packet->size = (unsigned int)len; memcpy(packet->data, data, len); - _rxPacketQueue.postLimit(packet, 256 * _concurrency); + _rxPacketQueue.postLimit(packet, 2048 * _concurrency); } else { #endif +*/ const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast(sock),reinterpret_cast(from),data,len,&_nextBackgroundTaskDeadline); if (ZT_ResultCode_isFatal(rc)) { char tmp[256]; @@ -3028,9 +3032,9 @@ public: _fatalErrorMessage = tmp; this->terminate(); } -#if defined(__LINUX__) || defined(__FreeBSD__) - } -#endif +//#if defined(__LINUX__) || defined(__FreeBSD__) +// } +//#endif }