Add some rate limiting to implicit gathers, and always send multicasts to at least one supernode so they can broadcast to old peers (temporary).

This commit is contained in:
Adam Ierymenko 2014-10-03 22:30:10 -07:00
parent e7c81ef34e
commit 62da7e67b6
3 changed files with 40 additions and 5 deletions

View File

@ -240,7 +240,12 @@
/** /**
* Delay between explicit MULTICAST_GATHER requests for a given multicast channel * Delay between explicit MULTICAST_GATHER requests for a given multicast channel
*/ */
#define ZT_MULTICAST_GATHER_DELAY (ZT_MULTICAST_LIKE_EXPIRE / 10) #define ZT_MULTICAST_EXPLICIT_GATHER_DELAY (ZT_MULTICAST_LIKE_EXPIRE / 10)
/**
* Minimum delay between implicit gathers via MULTICAST_FRAME
*/
#define ZT_MULTICAST_IMPLICIT_GATHER_DELAY 30000
/** /**
* Timeout for outgoing multicasts * Timeout for outgoing multicasts

View File

@ -124,6 +124,14 @@ void Multicaster::send(
Mutex::Lock _l(_groups_m); Mutex::Lock _l(_groups_m);
MulticastGroupStatus &gs = _groups[std::pair<uint64_t,MulticastGroup>(nwid,mg)]; MulticastGroupStatus &gs = _groups[std::pair<uint64_t,MulticastGroup>(nwid,mg)];
// TODO / DEPRECATED:
// Right now we also send all multicasts to at least one supernode.
// This supernode then relays them via the old multicast message
// type to pre 1.0.0 peers. We'll keep doing this until there aren't
// any of these on the network. Costs a bit of bandwidth, but maintains
// backward compability while people upgrade. Then this code can die.
bool gotASupernode = false;
if (gs.members.size() >= limit) { if (gs.members.size() >= limit) {
// If we already have enough members, just send and we're done -- no need for TX queue // If we already have enough members, just send and we're done -- no need for TX queue
OutboundMulticast out; OutboundMulticast out;
@ -144,13 +152,21 @@ void Multicaster::send(
unsigned int count = 0; unsigned int count = 0;
for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m) { for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m) {
out.sendOnly(*(RR->sw),m->address); // sendOnly() avoids overhead of creating sent log since we're going to discard this immediately out.sendOnly(*(RR->sw),m->address); // sendOnly() avoids overhead of creating sent log since we're going to discard this immediately
if (RR->topology->isSupernode(m->address))
gotASupernode = true;
if (++count >= limit) if (++count >= limit)
break; break;
} }
if (!gotASupernode) {
SharedPtr<Peer> sn(RR->topology->getBestSupernode());
if (sn)
out.sendOnly(*(RR->sw),sn->address());
}
} else { } else {
unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1; unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1;
if ((now - gs.lastExplicitGather) >= ZT_MULTICAST_GATHER_DELAY) { if ((now - gs.lastExplicitGather) >= ZT_MULTICAST_EXPLICIT_GATHER_DELAY) {
gs.lastExplicitGather = now; gs.lastExplicitGather = now;
// TODO / INPROGRESS: right now supernodes track multicast LIKEs, a relic // TODO / INPROGRESS: right now supernodes track multicast LIKEs, a relic
@ -173,6 +189,10 @@ void Multicaster::send(
gatherLimit = 0; // once we've done this we don't need to do it implicitly gatherLimit = 0; // once we've done this we don't need to do it implicitly
} }
if ((gatherLimit > 0)&&((now - gs.lastImplicitGather) > ZT_MULTICAST_IMPLICIT_GATHER_DELAY))
gs.lastImplicitGather = now;
else gatherLimit = 0;
gs.txQueue.push_back(OutboundMulticast()); gs.txQueue.push_back(OutboundMulticast());
OutboundMulticast &out = gs.txQueue.back(); OutboundMulticast &out = gs.txQueue.back();
@ -189,8 +209,17 @@ void Multicaster::send(
data, data,
len); len);
for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m) for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m) {
out.sendAndLog(*(RR->sw),m->address); out.sendAndLog(*(RR->sw),m->address);
if (RR->topology->isSupernode(m->address))
gotASupernode = true;
}
if (!gotASupernode) {
SharedPtr<Peer> sn(RR->topology->getBestSupernode());
if (sn)
out.sendAndLog(*(RR->sw),sn->address());
}
} }
} }

View File

@ -72,9 +72,10 @@ private:
struct MulticastGroupStatus struct MulticastGroupStatus
{ {
MulticastGroupStatus() : lastExplicitGather(0) {} MulticastGroupStatus() : lastExplicitGather(0),lastImplicitGather(0) {}
uint64_t lastExplicitGather; // time we last gathered members explicitly uint64_t lastExplicitGather;
uint64_t lastImplicitGather;
std::list<OutboundMulticast> txQueue; // pending outbound multicasts std::list<OutboundMulticast> txQueue; // pending outbound multicasts
std::vector<MulticastGroupMember> members; // members of this group std::vector<MulticastGroupMember> members; // members of this group
}; };