diff --git a/node/Hashtable.hpp b/node/Hashtable.hpp index 59a577262..29c548387 100644 --- a/node/Hashtable.hpp +++ b/node/Hashtable.hpp @@ -372,9 +372,12 @@ private: } static inline unsigned long _hc(const uint64_t i) { - // NOTE: this is fine for network IDs, but might be bad for other kinds - // of IDs if they are not evenly or randomly distributed. - return (unsigned long)((i ^ (i >> 32)) * 2654435761ULL); + /* NOTE: this assumes that 'i' is evenly distributed, which is the case for + * packet IDs and network IDs -- the two use cases in ZT for uint64_t keys. + * These values are also greater than 0xffffffff so they'll map onto a full + * bucket count just fine no matter what happens. Normally you'd want to + * hash an integer key index in a hash table. */ + return (unsigned long)i; } inline void _grow() diff --git a/node/Switch.cpp b/node/Switch.cpp index 989f497a0..3dcb7002b 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -531,11 +531,14 @@ unsigned long Switch::doTimerTasks(uint64_t now) { // Time out packets that didn't get all their fragments. Mutex::Lock _l(_defragQueue_m); - for(std::map< uint64_t,DefragQueueEntry >::iterator i(_defragQueue.begin());i!=_defragQueue.end();) { - if ((now - i->second.creationTime) > ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT) { + Hashtable< uint64_t,DefragQueueEntry >::Iterator i(_defragQueue); + uint64_t *packetId = (uint64_t *)0; + DefragQueueEntry *qe = (DefragQueueEntry *)0; + while (i.next(packetId,qe)) { + if ((now - qe->creationTime) > ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT) { TRACE("incomplete fragmented packet %.16llx timed out, fragments discarded",i->first); - _defragQueue.erase(i++); - } else ++i; + _defragQueue.erase(*packetId); + } } } @@ -577,32 +580,31 @@ void Switch::_handleRemotePacketFragment(const InetAddress &fromAddr,const void // seeing a Packet::Fragment? Mutex::Lock _l(_defragQueue_m); - std::map< uint64_t,DefragQueueEntry >::iterator dqe(_defragQueue.find(pid)); + DefragQueueEntry &dq = _defragQueue[pid]; - if (dqe == _defragQueue.end()) { + if (!dq.creationTime) { // We received a Packet::Fragment without its head, so queue it and wait - DefragQueueEntry &dq = _defragQueue[pid]; dq.creationTime = RR->node->now(); dq.frags[fno - 1] = fragment; dq.totalFragments = tf; // total fragment count is known dq.haveFragments = 1 << fno; // we have only this fragment //TRACE("fragment (%u/%u) of %.16llx from %s",fno + 1,tf,pid,fromAddr.toString().c_str()); - } else if (!(dqe->second.haveFragments & (1 << fno))) { + } else if (!(dq.haveFragments & (1 << fno))) { // We have other fragments and maybe the head, so add this one and check - dqe->second.frags[fno - 1] = fragment; - dqe->second.totalFragments = tf; + dq.frags[fno - 1] = fragment; + dq.totalFragments = tf; //TRACE("fragment (%u/%u) of %.16llx from %s",fno + 1,tf,pid,fromAddr.toString().c_str()); - if (Utils::countBits(dqe->second.haveFragments |= (1 << fno)) == tf) { + if (Utils::countBits(dq.haveFragments |= (1 << fno)) == tf) { // We have all fragments -- assemble and process full Packet //TRACE("packet %.16llx is complete, assembling and processing...",pid); - SharedPtr packet(dqe->second.frag0); + SharedPtr packet(dq.frag0); for(unsigned int f=1;fappend(dqe->second.frags[f - 1].payload(),dqe->second.frags[f - 1].payloadLength()); - _defragQueue.erase(dqe); + packet->append(dq.frags[f - 1].payload(),dq.frags[f - 1].payloadLength()); + _defragQueue.erase(pid); // dq no longer valid after this if (!packet->tryDecode(RR)) { Mutex::Lock _l(_rxQueue_m); @@ -645,26 +647,27 @@ void Switch::_handleRemotePacketHead(const InetAddress &fromAddr,const void *dat uint64_t pid = packet->packetId(); Mutex::Lock _l(_defragQueue_m); - std::map< uint64_t,DefragQueueEntry >::iterator dqe(_defragQueue.find(pid)); + DefragQueueEntry &dq = _defragQueue[pid]; - if (dqe == _defragQueue.end()) { + if (!dq.creationTime) { // If we have no other fragments yet, create an entry and save the head - DefragQueueEntry &dq = _defragQueue[pid]; + dq.creationTime = RR->node->now(); dq.frag0 = packet; dq.totalFragments = 0; // 0 == unknown, waiting for Packet::Fragment dq.haveFragments = 1; // head is first bit (left to right) //TRACE("fragment (0/?) of %.16llx from %s",pid,fromAddr.toString().c_str()); - } else if (!(dqe->second.haveFragments & 1)) { + } else if (!(dq.haveFragments & 1)) { // If we have other fragments but no head, see if we are complete with the head - if ((dqe->second.totalFragments)&&(Utils::countBits(dqe->second.haveFragments |= 1) == dqe->second.totalFragments)) { + + if ((dq.totalFragments)&&(Utils::countBits(dq.haveFragments |= 1) == dq.totalFragments)) { // We have all fragments -- assemble and process full Packet //TRACE("packet %.16llx is complete, assembling and processing...",pid); // packet already contains head, so append fragments - for(unsigned int f=1;fsecond.totalFragments;++f) - packet->append(dqe->second.frags[f - 1].payload(),dqe->second.frags[f - 1].payloadLength()); - _defragQueue.erase(dqe); + for(unsigned int f=1;fappend(dq.frags[f - 1].payload(),dq.frags[f - 1].payloadLength()); + _defragQueue.erase(pid); // dq no longer valid after this if (!packet->tryDecode(RR)) { Mutex::Lock _l(_rxQueue_m); @@ -672,7 +675,7 @@ void Switch::_handleRemotePacketHead(const InetAddress &fromAddr,const void *dat } } else { // Still waiting on more fragments, so queue the head - dqe->second.frag0 = packet; + dq.frag0 = packet; } } // else this is a duplicate head, ignore } else { diff --git a/node/Switch.hpp b/node/Switch.hpp index ac85606e2..a1b36014d 100644 --- a/node/Switch.hpp +++ b/node/Switch.hpp @@ -45,6 +45,7 @@ #include "Network.hpp" #include "SharedPtr.hpp" #include "IncomingPacket.hpp" +#include "Hashtable.hpp" /* Ethernet frame types that might be relevant to us */ #define ZT_ETHERTYPE_IPV4 0x0800 @@ -199,13 +200,14 @@ private: // Packet defragmentation queue -- comes before RX queue in path struct DefragQueueEntry { + DefragQueueEntry() : creationTime(0),totalFragments(0),haveFragments(0) {} uint64_t creationTime; SharedPtr frag0; Packet::Fragment frags[ZT_MAX_PACKET_FRAGMENTS - 1]; unsigned int totalFragments; // 0 if only frag0 received, waiting for frags uint32_t haveFragments; // bit mask, LSB to MSB }; - std::map< uint64_t,DefragQueueEntry > _defragQueue; + Hashtable< uint64_t,DefragQueueEntry > _defragQueue; Mutex _defragQueue_m; // ZeroTier-layer RX queue of incoming packets in the process of being decoded