From 7619b0ecbdb3485ff5f2531cea4b660e793b5467 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Fri, 21 Nov 2014 10:50:27 -0800 Subject: [PATCH] Send multicasts in random order. This should not affect most users, but on large networks it should cause service announcements to work a lot better. This is the result of a prolonged discussion with a user about the visibility of game servers on a large network. The old multicast algorithm was de-facto randomized due to its distributed nature, while the new algorithm is more deterministic. This will restore some randomization beyond limit-overflow conditions. It won't affect small networks at all. --- node/IncomingPacket.cpp | 6 +- node/Multicaster.cpp | 256 +++++++++++++++++++--------------------- node/Multicaster.hpp | 22 ++-- 3 files changed, 134 insertions(+), 150 deletions(-) diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp index efda53708..ca72687e3 100644 --- a/node/IncomingPacket.cpp +++ b/node/IncomingPacket.cpp @@ -306,7 +306,7 @@ bool IncomingPacket::_doOK(const RuntimeEnvironment *RR,const SharedPtr &p TRACE("%s(%s): OK(MULTICAST_GATHER) %.16llx/%s length %u",source().toString().c_str(),_remoteAddress.toString().c_str(),nwid,mg.toString().c_str(),size()); unsigned int count = at(ZT_PROTO_VERB_MULTICAST_GATHER__OK__IDX_GATHER_RESULTS + 4); - RR->mc->addMultiple(Utils::now(),nwid,mg,peer->address(),field(ZT_PROTO_VERB_MULTICAST_GATHER__OK__IDX_GATHER_RESULTS + 6,count * 5),count,at(ZT_PROTO_VERB_MULTICAST_GATHER__OK__IDX_GATHER_RESULTS)); + RR->mc->addMultiple(Utils::now(),nwid,mg,field(ZT_PROTO_VERB_MULTICAST_GATHER__OK__IDX_GATHER_RESULTS + 6,count * 5),count,at(ZT_PROTO_VERB_MULTICAST_GATHER__OK__IDX_GATHER_RESULTS)); } break; case Packet::VERB_MULTICAST_FRAME: { @@ -332,7 +332,7 @@ bool IncomingPacket::_doOK(const RuntimeEnvironment *RR,const SharedPtr &p offset += ZT_PROTO_VERB_MULTICAST_FRAME__OK__IDX_COM_AND_GATHER_RESULTS; unsigned int totalKnown = at(offset); offset += 4; unsigned int count = at(offset); offset += 2; - RR->mc->addMultiple(Utils::now(),nwid,mg,peer->address(),field(offset,count * 5),count,totalKnown); + RR->mc->addMultiple(Utils::now(),nwid,mg,field(offset,count * 5),count,totalKnown); } } break; @@ -655,7 +655,7 @@ bool IncomingPacket::_doMULTICAST_LIKE(const RuntimeEnvironment *RR,const Shared // Iterate through 18-byte network,MAC,ADI tuples for(unsigned int ptr=ZT_PACKET_IDX_PAYLOAD;ptrmc->add(now,at(ptr),MulticastGroup(MAC(field(ptr + 8,6),6),at(ptr + 14)),Address(),peer->address()); + RR->mc->add(now,at(ptr),MulticastGroup(MAC(field(ptr + 8,6),6),at(ptr + 14)),peer->address()); peer->received(RR,_fromSock,_remoteAddress,hops(),packetId(),Packet::VERB_MULTICAST_LIKE,0,Packet::VERB_NOP,now); } catch (std::exception &ex) { diff --git a/node/Multicaster.cpp b/node/Multicaster.cpp index aa9fef7ba..6c098cc10 100644 --- a/node/Multicaster.cpp +++ b/node/Multicaster.cpp @@ -52,18 +52,16 @@ Multicaster::~Multicaster() { } -void Multicaster::addMultiple(uint64_t now,uint64_t nwid,const MulticastGroup &mg,const Address &learnedFrom,const void *addresses,unsigned int count,unsigned int totalKnown) +void Multicaster::addMultiple(uint64_t now,uint64_t nwid,const MulticastGroup &mg,const void *addresses,unsigned int count,unsigned int totalKnown) { const unsigned char *p = (const unsigned char *)addresses; const unsigned char *e = p + (5 * count); Mutex::Lock _l(_groups_m); MulticastGroupStatus &gs = _groups[std::pair(nwid,mg)]; while (p != e) { - _add(now,nwid,mg,gs,learnedFrom,Address(p,5)); + _add(now,nwid,mg,gs,Address(p,5)); p += 5; } - if (RR->topology->isSupernode(learnedFrom)) - gs.totalKnownMembers = totalKnown; } unsigned int Multicaster::gather(const Address &queryingPeer,uint64_t nwid,const MulticastGroup &mg,Packet &appendTo,unsigned int limit) const @@ -160,123 +158,145 @@ void Multicaster::send( const void *data, unsigned int len) { + unsigned long idxbuf[8194]; + unsigned long *indexes = idxbuf; + Mutex::Lock _l(_groups_m); MulticastGroupStatus &gs = _groups[std::pair(nwid,mg)]; - if (gs.members.size() >= limit) { - // If we already have enough members, just send and we're done. We can - // skip the TX queue and skip the overhead of maintaining a send log by - // using sendOnly(). - OutboundMulticast out; + if (!gs.members.empty()) { + // Use a stack-allocated buffer unless this multicast group is ridiculously huge + if (gs.members.size() > 8194) + indexes = new unsigned long[gs.members.size()]; - out.init( - RR, - now, - nwid, - com, - limit, - 0, - src, - mg, - etherType, - data, - len); - - unsigned int count = 0; - - for(std::vector
::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) { - { // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version) - SharedPtr p(RR->topology->getPeer(*ast)); - if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1)) - continue; - } - - out.sendOnly(RR,*ast); - if (++count >= limit) - break; + // Generate a random permutation of member indexes + for(unsigned long i=0;i0;--i) { + unsigned long j = RR->prng->next32() % (i + 1); + unsigned long tmp = indexes[j]; + indexes[j] = indexes[i]; + indexes[i] = tmp; } - if (count < limit) { - for(std::vector::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m) { + if (gs.members.size() >= limit) { + // If we already have enough members, just send and we're done. We can + // skip the TX queue and skip the overhead of maintaining a send log by + // using sendOnly(). + OutboundMulticast out; + + out.init( + RR, + now, + nwid, + com, + limit, + 2, // we'll still gather a little from peers to keep multicast list fresh + src, + mg, + etherType, + data, + len); + + unsigned int count = 0; + + for(std::vector
::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) { { // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version) - SharedPtr p(RR->topology->getPeer(m->address)); + SharedPtr p(RR->topology->getPeer(*ast)); if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1)) continue; } - if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m->address) == alwaysSendTo.end()) { - out.sendOnly(RR,m->address); - if (++count >= limit) - break; - } - } - } - } else { - unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1; - - if ((now - gs.lastExplicitGather) >= ZT_MULTICAST_EXPLICIT_GATHER_DELAY) { - gs.lastExplicitGather = now; - SharedPtr sn(RR->topology->getBestSupernode()); - if (sn) { - TRACE(">>MC GATHER up to %u in %.16llx/%s",gatherLimit,nwid,mg.toString().c_str()); - - Packet outp(sn->address(),RR->identity.address(),Packet::VERB_MULTICAST_GATHER); - outp.append(nwid); - outp.append((uint8_t)0); - mg.mac().appendTo(outp); - outp.append((uint32_t)mg.adi()); - outp.append((uint32_t)gatherLimit); // +1 just means we'll have an extra in the queue if available - outp.armor(sn->key(),true); - sn->send(RR,outp.data(),outp.size(),now); - } - gatherLimit = 0; // implicit not needed - } - - gs.txQueue.push_back(OutboundMulticast()); - OutboundMulticast &out = gs.txQueue.back(); - - out.init( - RR, - now, - nwid, - com, - limit, - gatherLimit, - src, - mg, - etherType, - data, - len); - - unsigned int count = 0; - - for(std::vector
::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) { - { // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version) - SharedPtr p(RR->topology->getPeer(*ast)); - if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1)) - continue; + out.sendOnly(RR,*ast); + if (++count >= limit) + break; } - out.sendAndLog(RR,*ast); - if (++count >= limit) - break; - } + unsigned long idx = 0; + while (count < limit) { + const MulticastGroupMember &m = gs.members[indexes[idx++]]; - if (count < limit) { - for(std::vector::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m) { { // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version) - SharedPtr p(RR->topology->getPeer(m->address)); + SharedPtr p(RR->topology->getPeer(m.address)); if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1)) continue; } - if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m->address) == alwaysSendTo.end()) { - out.sendAndLog(RR,m->address); - if (++count >= limit) - break; + if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m.address) == alwaysSendTo.end()) { + out.sendOnly(RR,m.address); + ++count; + } + } + } else { + unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1; + + if ((now - gs.lastExplicitGather) >= ZT_MULTICAST_EXPLICIT_GATHER_DELAY) { + gs.lastExplicitGather = now; + SharedPtr sn(RR->topology->getBestSupernode()); + if (sn) { + TRACE(">>MC GATHER up to %u in %.16llx/%s",gatherLimit,nwid,mg.toString().c_str()); + + Packet outp(sn->address(),RR->identity.address(),Packet::VERB_MULTICAST_GATHER); + outp.append(nwid); + outp.append((uint8_t)0); + mg.mac().appendTo(outp); + outp.append((uint32_t)mg.adi()); + outp.append((uint32_t)gatherLimit); // +1 just means we'll have an extra in the queue if available + outp.armor(sn->key(),true); + sn->send(RR,outp.data(),outp.size(),now); + } + gatherLimit = 0; // don't need to gather from peers this time since we consulted the core + } + + gs.txQueue.push_back(OutboundMulticast()); + OutboundMulticast &out = gs.txQueue.back(); + + out.init( + RR, + now, + nwid, + com, + limit, + gatherLimit, + src, + mg, + etherType, + data, + len); + + unsigned int count = 0; + + for(std::vector
::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) { + { // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version) + SharedPtr p(RR->topology->getPeer(*ast)); + if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1)) + continue; + } + + out.sendAndLog(RR,*ast); + if (++count >= limit) + break; + } + + unsigned long idx = 0; + while ((count < limit)&&(idx < gs.members.size())) { + const MulticastGroupMember &m = gs.members[indexes[idx++]]; + + { // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version) + SharedPtr p(RR->topology->getPeer(m.address)); + if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1)) + continue; + } + + if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m.address) == alwaysSendTo.end()) { + out.sendAndLog(RR,m.address); + ++count; } } } + + if (indexes != idxbuf) + delete [] indexes; } // DEPRECATED / LEGACY / TODO: @@ -344,25 +364,6 @@ void Multicaster::clean(uint64_t now) while (reader != mm->second.members.end()) { if ((now - reader->timestamp) < ZT_MULTICAST_LIKE_EXPIRE) { *writer = *reader; - - /* We rank in ascending order of most recent relevant activity. For peers we've learned - * about by direct LIKEs, we do this in order of their own activity. For indirectly - * acquired peers we do this minus a constant to place these categorically below directly - * learned peers. For peers with no active Peer record, we use the time we last learned - * about them minus one day (a large constant) to put these at the bottom of the list. - * List is sorted in ascending order of rank and multicasts are sent last-to-first. */ - if (writer->learnedFrom != writer->address) { - SharedPtr p(RR->topology->getPeer(writer->learnedFrom)); - if (p) - writer->rank = (RR->topology->amSupernode() ? p->lastDirectReceive() : p->lastUnicastFrame()) - ZT_MULTICAST_LIKE_EXPIRE; - else writer->rank = writer->timestamp - (86400000 + ZT_MULTICAST_LIKE_EXPIRE); - } else { - SharedPtr p(RR->topology->getPeer(writer->address)); - if (p) - writer->rank = (RR->topology->amSupernode() ? p->lastDirectReceive() : p->lastUnicastFrame()); - else writer->rank = writer->timestamp - 86400000; - } - ++writer; ++count; } @@ -370,12 +371,9 @@ void Multicaster::clean(uint64_t now) } if (count) { - // There are remaining members, so re-sort them by rank and resize the vector - std::sort(mm->second.members.begin(),writer); // sorts in ascending order of rank - mm->second.members.resize(count); // trim off the ones we cut, after writer + mm->second.members.resize(count); ++mm; } else if (mm->second.txQueue.empty()) { - // There are no remaining members and no pending multicasts, so erase the entry _groups.erase(mm++); } else { mm->second.members.clear(); @@ -384,7 +382,7 @@ void Multicaster::clean(uint64_t now) } } -void Multicaster::_add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member) +void Multicaster::_add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,MulticastGroupStatus &gs,const Address &member) { // assumes _groups_m is locked @@ -392,20 +390,14 @@ void Multicaster::_add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,Multi if (member == RR->identity.address()) return; - // Update timestamp and learnedFrom if existing for(std::vector::iterator m(gs.members.begin());m!=gs.members.end();++m) { if (m->address == member) { - if (m->learnedFrom != member) // once we learn it directly, remember this forever - m->learnedFrom = learnedFrom; m->timestamp = now; return; } } - // If not existing, add to end of list (highest priority) -- these will - // be resorted on next clean(). In the future we might want to insert - // this somewhere else but we'll try this for now. - gs.members.push_back(MulticastGroupMember(member,learnedFrom,now)); + gs.members.push_back(MulticastGroupMember(member,now)); //TRACE("..MC %s joined multicast group %.16llx/%s via %s",member.toString().c_str(),nwid,mg.toString().c_str(),((learnedFrom) ? learnedFrom.toString().c_str() : "(direct)")); @@ -414,9 +406,9 @@ void Multicaster::_add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,Multi SharedPtr p(RR->topology->getPeer(member)); if ((!p)||(!p->remoteVersionKnown())||(p->remoteVersionMajor() >= 1)) { for(std::list::iterator tx(gs.txQueue.begin());tx!=gs.txQueue.end();) { - if (tx->atLimit()) + if (tx->atLimit()) { gs.txQueue.erase(tx++); - else { + } else { tx->sendIfNew(RR,member); if (tx->atLimit()) gs.txQueue.erase(tx++); diff --git a/node/Multicaster.hpp b/node/Multicaster.hpp index edfb62c6f..3aebe57fe 100644 --- a/node/Multicaster.hpp +++ b/node/Multicaster.hpp @@ -59,23 +59,17 @@ private: struct MulticastGroupMember { MulticastGroupMember() {} - MulticastGroupMember(const Address &a,const Address &lf,uint64_t ts) : address(a),learnedFrom(lf),timestamp(ts),rank(0) {} + MulticastGroupMember(const Address &a,uint64_t ts) : address(a),timestamp(ts) {} Address address; - Address learnedFrom; - uint64_t timestamp; // time of last LIKE/OK(GATHER) - uint64_t rank; // used by sorting algorithm in clean() - - // for sorting in ascending order of rank - inline bool operator<(const MulticastGroupMember &m) const throw() { return (rank < m.rank); } + uint64_t timestamp; // time of last notification }; struct MulticastGroupStatus { - MulticastGroupStatus() : lastExplicitGather(0),totalKnownMembers(0) {} + MulticastGroupStatus() : lastExplicitGather(0) {} uint64_t lastExplicitGather; - unsigned int totalKnownMembers; // 0 if unknown std::list txQueue; // pending outbound multicasts std::vector members; // members of this group }; @@ -90,13 +84,12 @@ public: * @param now Current time * @param nwid Network ID * @param mg Multicast group - * @param learnedFrom Address from which we learned this member * @param member New member address */ - inline void add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,const Address &learnedFrom,const Address &member) + inline void add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,const Address &member) { Mutex::Lock _l(_groups_m); - _add(now,nwid,mg,_groups[std::pair(nwid,mg)],learnedFrom,member); + _add(now,nwid,mg,_groups[std::pair(nwid,mg)],member); } /** @@ -107,12 +100,11 @@ public: * @param now Current time * @param nwid Network ID * @param mg Multicast group - * @param learnedFrom Peer from which we received this list * @param addresses Raw binary addresses in big-endian format, as a series of 5-byte fields * @param count Number of addresses * @param totalKnown Total number of known addresses as reported by peer */ - void addMultiple(uint64_t now,uint64_t nwid,const MulticastGroup &mg,const Address &learnedFrom,const void *addresses,unsigned int count,unsigned int totalKnown); + void addMultiple(uint64_t now,uint64_t nwid,const MulticastGroup &mg,const void *addresses,unsigned int count,unsigned int totalKnown); /** * Append gather results to a packet by choosing registered multicast recipients at random @@ -177,7 +169,7 @@ public: void clean(uint64_t now); private: - void _add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member); + void _add(uint64_t now,uint64_t nwid,const MulticastGroup &mg,MulticastGroupStatus &gs,const Address &member); const RuntimeEnvironment *RR; std::map< std::pair,MulticastGroupStatus > _groups;