From d6a1868d0a9b1d65417b87a20a4e234903d290f1 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Fri, 18 Mar 2016 14:16:07 -0700 Subject: [PATCH] Refactor incoming packet (rxQueue/fragmentQueue) to eliminate variable length queues and merge queues. This is both faster and saves memory. --- node/Constants.hpp | 7 +- node/DeferredPackets.cpp | 48 +++-- node/DeferredPackets.hpp | 13 +- node/IncomingPacket.hpp | 47 ++++- node/Switch.cpp | 421 ++++++++++++++++++++------------------- node/Switch.hpp | 39 ++-- 6 files changed, 317 insertions(+), 258 deletions(-) diff --git a/node/Constants.hpp b/node/Constants.hpp index 19ac3619d..12a052b91 100644 --- a/node/Constants.hpp +++ b/node/Constants.hpp @@ -163,9 +163,12 @@ #define ZT_MAX_PACKET_FRAGMENTS 4 /** - * Timeout for receipt of fragmented packets in ms + * Size of RX queue + * + * This is about 2mb, and can be decreased for small devices. A queue smaller + * than about 4 is probably going to cause a lot of lost packets. */ -#define ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT 500 +#define ZT_RX_QUEUE_SIZE 64 /** * Length of secret key in bytes -- 256-bit -- do not change diff --git a/node/DeferredPackets.cpp b/node/DeferredPackets.cpp index c8e63fc84..192b40787 100644 --- a/node/DeferredPackets.cpp +++ b/node/DeferredPackets.cpp @@ -26,8 +26,6 @@ namespace ZeroTier { DeferredPackets::DeferredPackets(const RuntimeEnvironment *renv) : RR(renv), - _readPtr(0), - _writePtr(0), _waiting(0), _die(false) { @@ -37,39 +35,45 @@ DeferredPackets::~DeferredPackets() { _q_m.lock(); _die = true; - while (_waiting > 0) { - _q_m.unlock(); + _q_m.unlock(); + + for(;;) { _q_s.post(); + _q_m.lock(); + if (_waiting <= 0) { + _q_m.unlock(); + break; + } else { + _q_m.unlock(); + } } } bool DeferredPackets::enqueue(IncomingPacket *pkt) { - _q_m.lock(); - const unsigned long p = _writePtr % ZT_DEFFEREDPACKETS_MAX; - if (_q[p]) { - _q_m.unlock(); - return false; - } else { - _q[p].setToUnsafe(pkt); - ++_writePtr; - _q_m.unlock(); - _q_s.post(); - return true; + { + Mutex::Lock _l(_q_m); + if (_q.size() >= ZT_DEFFEREDPACKETS_MAX) + return false; + _q.push_back(*pkt); } + _q_s.post(); + return true; } int DeferredPackets::process() { - SharedPtr pkt; + std::list pkt; _q_m.lock(); + if (_die) { _q_m.unlock(); return -1; } - while (_readPtr == _writePtr) { + + while (_q.empty()) { ++_waiting; _q_m.unlock(); _q_s.wait(); @@ -80,10 +84,16 @@ int DeferredPackets::process() return -1; } } - pkt.swap(_q[_readPtr++ % ZT_DEFFEREDPACKETS_MAX]); + + // Move item from _q list to a dummy list here to avoid copying packet + pkt.splice(pkt.end(),_q,_q.begin()); + _q_m.unlock(); - pkt->tryDecode(RR,true); + try { + pkt.front().tryDecode(RR,true); + } catch ( ... ) {} // drop invalids + return 1; } diff --git a/node/DeferredPackets.hpp b/node/DeferredPackets.hpp index 5ba265314..a98553969 100644 --- a/node/DeferredPackets.hpp +++ b/node/DeferredPackets.hpp @@ -19,6 +19,8 @@ #ifndef ZT_DEFERREDPACKETS_HPP #define ZT_DEFERREDPACKETS_HPP +#include + #include "Constants.hpp" #include "SharedPtr.hpp" #include "Mutex.hpp" @@ -28,7 +30,7 @@ /** * Maximum number of deferred packets */ -#define ZT_DEFFEREDPACKETS_MAX 1024 +#define ZT_DEFFEREDPACKETS_MAX 256 namespace ZeroTier { @@ -53,11 +55,6 @@ public: /** * Enqueue a packet * - * Since packets enqueue themselves, they call it with 'this' and we wrap - * them in a SharedPtr<>. This is safe as SharedPtr<> is introspective and - * supports this. This should not be called from any other code outside - * IncomingPacket. - * * @param pkt Packet to process later (possibly in the background) * @return False if queue is full */ @@ -75,10 +72,8 @@ public: int process(); private: - SharedPtr _q[ZT_DEFFEREDPACKETS_MAX]; + std::list _q; const RuntimeEnvironment *const RR; - unsigned long _readPtr; - unsigned long _writePtr; volatile int _waiting; volatile bool _die; Mutex _q_m; diff --git a/node/IncomingPacket.hpp b/node/IncomingPacket.hpp index 0df200342..96e46c002 100644 --- a/node/IncomingPacket.hpp +++ b/node/IncomingPacket.hpp @@ -24,8 +24,6 @@ #include "Packet.hpp" #include "InetAddress.hpp" #include "Utils.hpp" -#include "SharedPtr.hpp" -#include "AtomicCounter.hpp" #include "MulticastGroup.hpp" #include "Peer.hpp" @@ -55,9 +53,21 @@ class Network; */ class IncomingPacket : public Packet { - friend class SharedPtr; - public: + IncomingPacket() : + Packet(), + _receiveTime(0), + _localAddress(), + _remoteAddress() + { + } + + IncomingPacket(const IncomingPacket &p) + { + // All fields including InetAddress are memcpy'able + memcpy(this,&p,sizeof(IncomingPacket)); + } + /** * Create a new packet-in-decode * @@ -72,11 +82,35 @@ public: Packet(data,len), _receiveTime(now), _localAddress(localAddress), - _remoteAddress(remoteAddress), - __refCount() + _remoteAddress(remoteAddress) { } + inline IncomingPacket &operator=(const IncomingPacket &p) + { + // All fields including InetAddress are memcpy'able + memcpy(this,&p,sizeof(IncomingPacket)); + return *this; + } + + /** + * Init packet-in-decode in place + * + * @param data Packet data + * @param len Packet length + * @param localAddress Local interface address + * @param remoteAddress Address from which packet came + * @param now Current time + * @throws std::out_of_range Range error processing packet + */ + inline void init(const void *data,unsigned int len,const InetAddress &localAddress,const InetAddress &remoteAddress,uint64_t now) + { + copyFrom(data,len); + _receiveTime = now; + _localAddress = localAddress; + _remoteAddress = remoteAddress; + } + /** * Attempt to decode this packet * @@ -154,7 +188,6 @@ private: uint64_t _receiveTime; InetAddress _localAddress; InetAddress _remoteAddress; - AtomicCounter __refCount; }; } // namespace ZeroTier diff --git a/node/Switch.cpp b/node/Switch.cpp index 4e20300df..181bbf228 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -60,7 +60,6 @@ Switch::Switch(const RuntimeEnvironment *renv) : RR(renv), _lastBeaconResponse(0), _outstandingWhoisRequests(32), - _defragQueue(32), _lastUniteAttempt(8) // only really used on root servers and upstreams, and it'll grow there just fine { } @@ -72,11 +71,14 @@ Switch::~Switch() void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len) { try { + const uint64_t now = RR->node->now(); + if (len == 13) { /* LEGACY: before VERB_PUSH_DIRECT_PATHS, peers used broadcast * announcements on the LAN to solve the 'same network problem.' We * no longer send these, but we'll listen for them for a while to * locate peers with versions <1.0.4. */ + Address beaconAddr(reinterpret_cast(data) + 8,5); if (beaconAddr == RR->identity.address()) return; @@ -84,7 +86,6 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from return; SharedPtr peer(RR->topology->getPeer(beaconAddr)); if (peer) { // we'll only respond to beacons from known peers - const uint64_t now = RR->node->now(); if ((now - _lastBeaconResponse) >= 2500) { // limit rate of responses _lastBeaconResponse = now; Packet outp(peer->address(),RR->identity.address(),Packet::VERB_NOP); @@ -92,11 +93,209 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from RR->node->putPacket(localAddr,fromAddr,outp.data(),outp.size()); } } - } else if (len > ZT_PROTO_MIN_FRAGMENT_LENGTH) { - if (((const unsigned char *)data)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR) { - _handleRemotePacketFragment(localAddr,fromAddr,data,len); - } else if (len >= ZT_PROTO_MIN_PACKET_LENGTH) { - _handleRemotePacketHead(localAddr,fromAddr,data,len); + + } else if (len > ZT_PROTO_MIN_FRAGMENT_LENGTH) { // min length check is important! + if (reinterpret_cast(data)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR) { + // Handle fragment ---------------------------------------------------- + + Packet::Fragment fragment(data,len); + Address destination(fragment.destination()); + + if (destination != RR->identity.address()) { + // Fragment is not for us, so try to relay it + if (fragment.hops() < ZT_RELAY_MAX_HOPS) { + fragment.incrementHops(); + + // Note: we don't bother initiating NAT-t for fragments, since heads will set that off. + // It wouldn't hurt anything, just redundant and unnecessary. + SharedPtr relayTo = RR->topology->getPeer(destination); + if ((!relayTo)||(!relayTo->send(fragment.data(),fragment.size(),now))) { + #ifdef ZT_ENABLE_CLUSTER + if (RR->cluster) { + RR->cluster->sendViaCluster(Address(),destination,fragment.data(),fragment.size(),false); + return; + } + #endif + + // Don't know peer or no direct path -- so relay via root server + relayTo = RR->topology->getBestRoot(); + if (relayTo) + relayTo->send(fragment.data(),fragment.size(),now); + } + } else { + TRACE("dropped relay [fragment](%s) -> %s, max hops exceeded",fromAddr.toString().c_str(),destination.toString().c_str()); + } + } else { + // Fragment looks like ours + const uint64_t fragmentPacketId = fragment.packetId(); + const unsigned int fragmentNumber = fragment.fragmentNumber(); + const unsigned int totalFragments = fragment.totalFragments(); + + if ((totalFragments <= ZT_MAX_PACKET_FRAGMENTS)&&(fragmentNumber < ZT_MAX_PACKET_FRAGMENTS)&&(fragmentNumber > 0)&&(totalFragments > 1)) { + // Fragment appears basically sane. Its fragment number must be + // 1 or more, since a Packet with fragmented bit set is fragment 0. + // Total fragments must be more than 1, otherwise why are we + // seeing a Packet::Fragment? + + Mutex::Lock _l(_rxQueue_m); + RXQueueEntry *const rq = _findRXQueueEntry(fragmentPacketId); + + if ((!rq->timestamp)||(rq->packetId != fragmentPacketId)) { + // No packet found, so we received a fragment without its head. + //TRACE("fragment (%u/%u) of %.16llx from %s",fragmentNumber + 1,totalFragments,fragmentPacketId,fromAddr.toString().c_str()); + + rq->timestamp = now; + rq->packetId = fragmentPacketId; + rq->frags[fragmentNumber - 1] = fragment; + rq->totalFragments = totalFragments; // total fragment count is known + rq->haveFragments = 1 << fragmentNumber; // we have only this fragment + rq->complete = false; + } else if (!(rq->haveFragments & (1 << fragmentNumber))) { + // We have other fragments and maybe the head, so add this one and check + //TRACE("fragment (%u/%u) of %.16llx from %s",fragmentNumber + 1,totalFragments,fragmentPacketId,fromAddr.toString().c_str()); + + rq->frags[fragmentNumber - 1] = fragment; + rq->totalFragments = totalFragments; + + if (Utils::countBits(rq->haveFragments |= (1 << fragmentNumber)) == totalFragments) { + // We have all fragments -- assemble and process full Packet + //TRACE("packet %.16llx is complete, assembling and processing...",fragmentPacketId); + + for(unsigned int f=1;ffrag0.append(rq->frags[f - 1].payload(),rq->frags[f - 1].payloadLength()); + + if (rq->frag0.tryDecode(RR,false)) { + rq->timestamp = 0; // packet decoded, free entry + } else { + rq->complete = true; // set complete flag but leave entry since it probably needs WHOIS or something + } + } + } // else this is a duplicate fragment, ignore + } + } + + // -------------------------------------------------------------------- + } else if (len >= ZT_PROTO_MIN_PACKET_LENGTH) { // min length check is important! + // Handle packet head ------------------------------------------------- + + // See packet format in Packet.hpp to understand this + const uint64_t packetId = ( + (((uint64_t)reinterpret_cast(data)[0]) << 56) | + (((uint64_t)reinterpret_cast(data)[1]) << 48) | + (((uint64_t)reinterpret_cast(data)[2]) << 40) | + (((uint64_t)reinterpret_cast(data)[3]) << 32) | + (((uint64_t)reinterpret_cast(data)[4]) << 24) | + (((uint64_t)reinterpret_cast(data)[5]) << 16) | + (((uint64_t)reinterpret_cast(data)[6]) << 8) | + ((uint64_t)reinterpret_cast(data)[7]) + ); + const Address destination(reinterpret_cast(data) + 8,ZT_ADDRESS_LENGTH); + const Address source(reinterpret_cast(data) + 13,ZT_ADDRESS_LENGTH); + + // Catch this and toss it -- it would never work, but it could happen if we somehow + // mistakenly guessed an address we're bound to as a destination for another peer. + if (source == RR->identity.address()) + return; + + //TRACE("<< %.16llx %s -> %s (size: %u)",(unsigned long long)packet->packetId(),source.toString().c_str(),destination.toString().c_str(),packet->size()); + + if (destination != RR->identity.address()) { + Packet packet(data,len); + + // Packet is not for us, so try to relay it + if (packet.hops() < ZT_RELAY_MAX_HOPS) { + packet.incrementHops(); + + SharedPtr relayTo = RR->topology->getPeer(destination); + if ((relayTo)&&((relayTo->send(packet.data(),packet.size(),now)))) { + Mutex::Lock _l(_lastUniteAttempt_m); + uint64_t &luts = _lastUniteAttempt[_LastUniteKey(source,destination)]; + if ((now - luts) >= ZT_MIN_UNITE_INTERVAL) { + luts = now; + unite(source,destination); + } + } else { + #ifdef ZT_ENABLE_CLUSTER + if (RR->cluster) { + bool shouldUnite; + { + Mutex::Lock _l(_lastUniteAttempt_m); + uint64_t &luts = _lastUniteAttempt[_LastUniteKey(source,destination)]; + shouldUnite = ((now - luts) >= ZT_MIN_UNITE_INTERVAL); + if (shouldUnite) + luts = now; + } + RR->cluster->sendViaCluster(source,destination,packet.data(),packet.size(),shouldUnite); + return; + } + #endif + + relayTo = RR->topology->getBestRoot(&source,1,true); + if (relayTo) + relayTo->send(packet.data(),packet.size(),now); + } + } else { + TRACE("dropped relay %s(%s) -> %s, max hops exceeded",packet.source().toString().c_str(),fromAddr.toString().c_str(),destination.toString().c_str()); + } + } else if ((reinterpret_cast(data)[ZT_PACKET_IDX_FLAGS] & ZT_PROTO_FLAG_FRAGMENTED) != 0) { + // Packet is the head of a fragmented packet series + + Mutex::Lock _l(_rxQueue_m); + RXQueueEntry *const rq = _findRXQueueEntry(packetId); + + if ((!rq->timestamp)||(rq->packetId != packetId)) { + // If we have no other fragments yet, create an entry and save the head + //TRACE("fragment (0/?) of %.16llx from %s",pid,fromAddr.toString().c_str()); + + rq->timestamp = now; + rq->packetId = packetId; + rq->frag0.init(data,len,localAddr,fromAddr,now); + rq->totalFragments = 0; + rq->haveFragments = 1; + rq->complete = false; + } else if (!(rq->haveFragments & 1)) { + // If we have other fragments but no head, see if we are complete with the head + + if ((rq->totalFragments > 1)&&(Utils::countBits(rq->haveFragments |= 1) == rq->totalFragments)) { + // We have all fragments -- assemble and process full Packet + //TRACE("packet %.16llx is complete, assembling and processing...",pid); + + rq->frag0.init(data,len,localAddr,fromAddr,now); + for(unsigned int f=1;ftotalFragments;++f) + rq->frag0.append(rq->frags[f - 1].payload(),rq->frags[f - 1].payloadLength()); + + if (rq->frag0.tryDecode(RR,false)) { + rq->timestamp = 0; // packet decoded, free entry + } else { + rq->complete = true; // set complete flag but leave entry since it probably needs WHOIS or something + } + } else { + // Still waiting on more fragments, but keep the head + rq->frag0.init(data,len,localAddr,fromAddr,now); + } + } // else this is a duplicate head, ignore + } else { + // Packet is unfragmented, so just process it + IncomingPacket packet(data,len,localAddr,fromAddr,now); + if (!packet.tryDecode(RR,false)) { + Mutex::Lock _l(_rxQueue_m); + RXQueueEntry *rq = &(_rxQueue[ZT_RX_QUEUE_SIZE - 1]); + unsigned long i = ZT_RX_QUEUE_SIZE - 1; + while ((i)&&(rq->timestamp)) { + RXQueueEntry *tmp = &(_rxQueue[--i]); + if (tmp->timestamp < rq->timestamp) + rq = tmp; + } + rq->timestamp = now; + rq->packetId = packetId; + rq->frag0 = packet; + rq->totalFragments = 1; + rq->haveFragments = 1; + rq->complete = true; + } + } + + // -------------------------------------------------------------------- } } } catch (std::exception &ex) { @@ -451,10 +650,13 @@ void Switch::doAnythingWaitingForPeer(const SharedPtr &peer) { // finish processing any packets waiting on peer's public key / identity Mutex::Lock _l(_rxQueue_m); - for(std::list< SharedPtr >::iterator rxi(_rxQueue.begin());rxi!=_rxQueue.end();) { - if ((*rxi)->tryDecode(RR,false)) - _rxQueue.erase(rxi++); - else ++rxi; + unsigned long i = ZT_RX_QUEUE_SIZE; + while (i) { + RXQueueEntry *rq = &(_rxQueue[--i]); + if ((rq->timestamp)&&(rq->complete)) { + if (rq->frag0.tryDecode(RR,false)) + rq->timestamp = 0; + } } } @@ -546,29 +748,6 @@ unsigned long Switch::doTimerTasks(uint64_t now) } } - { // Time out RX queue packets that never got WHOIS lookups or other info. - Mutex::Lock _l(_rxQueue_m); - for(std::list< SharedPtr >::iterator i(_rxQueue.begin());i!=_rxQueue.end();) { - if ((now - (*i)->receiveTime()) > ZT_RECEIVE_QUEUE_TIMEOUT) { - TRACE("RX %s -> %s timed out",(*i)->source().toString().c_str(),(*i)->destination().toString().c_str()); - _rxQueue.erase(i++); - } else ++i; - } - } - - { // Time out packets that didn't get all their fragments. - Mutex::Lock _l(_defragQueue_m); - Hashtable< uint64_t,DefragQueueEntry >::Iterator i(_defragQueue); - uint64_t *packetId = (uint64_t *)0; - DefragQueueEntry *qe = (DefragQueueEntry *)0; - while (i.next(packetId,qe)) { - if ((now - qe->creationTime) > ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT) { - TRACE("incomplete fragmented packet %.16llx timed out, fragments discarded",*packetId); - _defragQueue.erase(*packetId); - } - } - } - { // Remove really old last unite attempt entries to keep table size controlled Mutex::Lock _l(_lastUniteAttempt_m); Hashtable< _LastUniteKey,uint64_t >::Iterator i(_lastUniteAttempt); @@ -583,180 +762,6 @@ unsigned long Switch::doTimerTasks(uint64_t now) return nextDelay; } -void Switch::_handleRemotePacketFragment(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len) -{ - Packet::Fragment fragment(data,len); - Address destination(fragment.destination()); - - if (destination != RR->identity.address()) { - // Fragment is not for us, so try to relay it - if (fragment.hops() < ZT_RELAY_MAX_HOPS) { - fragment.incrementHops(); - - // Note: we don't bother initiating NAT-t for fragments, since heads will set that off. - // It wouldn't hurt anything, just redundant and unnecessary. - SharedPtr relayTo = RR->topology->getPeer(destination); - if ((!relayTo)||(!relayTo->send(fragment.data(),fragment.size(),RR->node->now()))) { -#ifdef ZT_ENABLE_CLUSTER - if (RR->cluster) { - RR->cluster->sendViaCluster(Address(),destination,fragment.data(),fragment.size(),false); - return; - } -#endif - - // Don't know peer or no direct path -- so relay via root server - relayTo = RR->topology->getBestRoot(); - if (relayTo) - relayTo->send(fragment.data(),fragment.size(),RR->node->now()); - } - } else { - TRACE("dropped relay [fragment](%s) -> %s, max hops exceeded",fromAddr.toString().c_str(),destination.toString().c_str()); - } - } else { - // Fragment looks like ours - uint64_t pid = fragment.packetId(); - unsigned int fno = fragment.fragmentNumber(); - unsigned int tf = fragment.totalFragments(); - - if ((tf <= ZT_MAX_PACKET_FRAGMENTS)&&(fno < ZT_MAX_PACKET_FRAGMENTS)&&(fno > 0)&&(tf > 1)) { - // Fragment appears basically sane. Its fragment number must be - // 1 or more, since a Packet with fragmented bit set is fragment 0. - // Total fragments must be more than 1, otherwise why are we - // seeing a Packet::Fragment? - - Mutex::Lock _l(_defragQueue_m); - DefragQueueEntry &dq = _defragQueue[pid]; - - if (!dq.creationTime) { - // We received a Packet::Fragment without its head, so queue it and wait - - dq.creationTime = RR->node->now(); - dq.frags[fno - 1] = fragment; - dq.totalFragments = tf; // total fragment count is known - dq.haveFragments = 1 << fno; // we have only this fragment - //TRACE("fragment (%u/%u) of %.16llx from %s",fno + 1,tf,pid,fromAddr.toString().c_str()); - } else if (!(dq.haveFragments & (1 << fno))) { - // We have other fragments and maybe the head, so add this one and check - - dq.frags[fno - 1] = fragment; - dq.totalFragments = tf; - //TRACE("fragment (%u/%u) of %.16llx from %s",fno + 1,tf,pid,fromAddr.toString().c_str()); - - if (Utils::countBits(dq.haveFragments |= (1 << fno)) == tf) { - // We have all fragments -- assemble and process full Packet - //TRACE("packet %.16llx is complete, assembling and processing...",pid); - - SharedPtr packet(dq.frag0); - for(unsigned int f=1;fappend(dq.frags[f - 1].payload(),dq.frags[f - 1].payloadLength()); - _defragQueue.erase(pid); // dq no longer valid after this - - if (!packet->tryDecode(RR,false)) { - Mutex::Lock _l(_rxQueue_m); - _rxQueue.push_back(packet); - } - } - } // else this is a duplicate fragment, ignore - } - } -} - -void Switch::_handleRemotePacketHead(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len) -{ - const uint64_t now = RR->node->now(); - SharedPtr packet(new IncomingPacket(data,len,localAddr,fromAddr,now)); - - Address source(packet->source()); - Address destination(packet->destination()); - - // Catch this and toss it -- it would never work, but it could happen if we somehow - // mistakenly guessed an address we're bound to as a destination for another peer. - if (source == RR->identity.address()) - return; - - //TRACE("<< %.16llx %s -> %s (size: %u)",(unsigned long long)packet->packetId(),source.toString().c_str(),destination.toString().c_str(),packet->size()); - - if (destination != RR->identity.address()) { - // Packet is not for us, so try to relay it - if (packet->hops() < ZT_RELAY_MAX_HOPS) { - packet->incrementHops(); - - SharedPtr relayTo = RR->topology->getPeer(destination); - if ((relayTo)&&((relayTo->send(packet->data(),packet->size(),now)))) { - Mutex::Lock _l(_lastUniteAttempt_m); - uint64_t &luts = _lastUniteAttempt[_LastUniteKey(source,destination)]; - if ((now - luts) >= ZT_MIN_UNITE_INTERVAL) { - luts = now; - unite(source,destination); - } - } else { -#ifdef ZT_ENABLE_CLUSTER - if (RR->cluster) { - bool shouldUnite; - { - Mutex::Lock _l(_lastUniteAttempt_m); - uint64_t &luts = _lastUniteAttempt[_LastUniteKey(source,destination)]; - shouldUnite = ((now - luts) >= ZT_MIN_UNITE_INTERVAL); - if (shouldUnite) - luts = now; - } - RR->cluster->sendViaCluster(source,destination,packet->data(),packet->size(),shouldUnite); - return; - } -#endif - - relayTo = RR->topology->getBestRoot(&source,1,true); - if (relayTo) - relayTo->send(packet->data(),packet->size(),now); - } - } else { - TRACE("dropped relay %s(%s) -> %s, max hops exceeded",packet->source().toString().c_str(),fromAddr.toString().c_str(),destination.toString().c_str()); - } - } else if (packet->fragmented()) { - // Packet is the head of a fragmented packet series - - uint64_t pid = packet->packetId(); - Mutex::Lock _l(_defragQueue_m); - DefragQueueEntry &dq = _defragQueue[pid]; - - if (!dq.creationTime) { - // If we have no other fragments yet, create an entry and save the head - - dq.creationTime = now; - dq.frag0 = packet; - dq.totalFragments = 0; // 0 == unknown, waiting for Packet::Fragment - dq.haveFragments = 1; // head is first bit (left to right) - //TRACE("fragment (0/?) of %.16llx from %s",pid,fromAddr.toString().c_str()); - } else if (!(dq.haveFragments & 1)) { - // If we have other fragments but no head, see if we are complete with the head - - if ((dq.totalFragments)&&(Utils::countBits(dq.haveFragments |= 1) == dq.totalFragments)) { - // We have all fragments -- assemble and process full Packet - - //TRACE("packet %.16llx is complete, assembling and processing...",pid); - // packet already contains head, so append fragments - for(unsigned int f=1;fappend(dq.frags[f - 1].payload(),dq.frags[f - 1].payloadLength()); - _defragQueue.erase(pid); // dq no longer valid after this - - if (!packet->tryDecode(RR,false)) { - Mutex::Lock _l(_rxQueue_m); - _rxQueue.push_back(packet); - } - } else { - // Still waiting on more fragments, so queue the head - dq.frag0 = packet; - } - } // else this is a duplicate head, ignore - } else { - // Packet is unfragmented, so just process it - if (!packet->tryDecode(RR,false)) { - Mutex::Lock _l(_rxQueue_m); - _rxQueue.push_back(packet); - } - } -} - Address Switch::_sendWhoisRequest(const Address &addr,const Address *peersAlreadyConsulted,unsigned int numPeersAlreadyConsulted) { SharedPtr root(RR->topology->getBestRoot(peersAlreadyConsulted,numPeersAlreadyConsulted,false)); diff --git a/node/Switch.hpp b/node/Switch.hpp index f77bf86cc..3ba2736db 100644 --- a/node/Switch.hpp +++ b/node/Switch.hpp @@ -150,8 +150,6 @@ public: unsigned long doTimerTasks(uint64_t now); private: - void _handleRemotePacketFragment(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len); - void _handleRemotePacketHead(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len); Address _sendWhoisRequest(const Address &addr,const Address *peersAlreadyConsulted,unsigned int numPeersAlreadyConsulted); bool _trySend(const Packet &packet,bool encrypt,uint64_t nwid); @@ -169,23 +167,38 @@ private: Hashtable< Address,WhoisRequest > _outstandingWhoisRequests; Mutex _outstandingWhoisRequests_m; - // Packet defragmentation queue -- comes before RX queue in path - struct DefragQueueEntry + // Packets waiting for WHOIS replies or other decode info or missing fragments + struct RXQueueEntry { - DefragQueueEntry() : creationTime(0),totalFragments(0),haveFragments(0) {} - uint64_t creationTime; - SharedPtr frag0; - Packet::Fragment frags[ZT_MAX_PACKET_FRAGMENTS - 1]; + RXQueueEntry() : timestamp(0) {} + uint64_t timestamp; // 0 if entry is not in use + uint64_t packetId; + IncomingPacket frag0; // head of packet + Packet::Fragment frags[ZT_MAX_PACKET_FRAGMENTS - 1]; // later fragments (if any) unsigned int totalFragments; // 0 if only frag0 received, waiting for frags uint32_t haveFragments; // bit mask, LSB to MSB + bool complete; // if true, packet is complete }; - Hashtable< uint64_t,DefragQueueEntry > _defragQueue; - Mutex _defragQueue_m; - - // ZeroTier-layer RX queue of incoming packets in the process of being decoded - std::list< SharedPtr > _rxQueue; + RXQueueEntry _rxQueue[ZT_RX_QUEUE_SIZE]; Mutex _rxQueue_m; + /* Returns the matching or oldest entry. Caller must check timestamp and + * packet ID to determine which. */ + inline RXQueueEntry *_findRXQueueEntry(uint64_t packetId) + { + RXQueueEntry *rq; + RXQueueEntry *oldest = &(_rxQueue[ZT_RX_QUEUE_SIZE - 1]); + unsigned long i = ZT_RX_QUEUE_SIZE; + while (i) { + rq = &(_rxQueue[--i]); + if (rq->timestamp < oldest->timestamp) + oldest = rq; + if ((rq->packetId == packetId)&&(rq->timestamp)) + return rq; + } + return oldest; + } + // ZeroTier-layer TX queue entry struct TXQueueEntry {