Simpler variant on multicast propagation algorithm seems to perform better by being less deterministic. May also be faster.

This commit is contained in:
Adam Ierymenko 2013-07-13 15:17:21 -04:00
parent 195ded4608
commit ca83f07b54

View File

@ -119,19 +119,6 @@ public:
return id.verifySignature(digest,signature,siglen);
}
/**
* Update the most recent LIKE time for an address in a given multicast group on a given network
*
* @param nwid Network ID
* @param mg Multicast group
* @param addr Address that likes group on given network
* @param now Current timestamp
*/
inline void likesMulticastGroup(const uint64_t nwid,const MulticastGroup &mg,const Address &addr,const uint64_t now)
{
_multicastMemberships[MulticastChannel(nwid,mg)][addr] = now;
}
/**
* Compute the CRC64 code for multicast deduplication
*
@ -194,6 +181,27 @@ public:
_multicastHistory[mhi][1] = now;
}
/**
* Update the most recent LIKE time for an address in a given multicast group on a given network
*
* @param nwid Network ID
* @param mg Multicast group
* @param addr Address that likes group on given network
* @param now Current timestamp
*/
inline void likesMulticastGroup(const uint64_t nwid,const MulticastGroup &mg,const Address &addr,const uint64_t now)
{
Mutex::Lock _l(_multicastMemberships_m);
std::vector<MulticastMembership> &memberships = _multicastMemberships[MulticastChannel(nwid,mg)];
for(std::vector<MulticastMembership>::iterator mm(memberships.begin());mm!=memberships.end();++mm) {
if (mm->first == addr) {
mm->second = now;
return;
}
}
memberships.push_back(MulticastMembership(addr,now));
}
/**
* Choose peers to send a propagating multicast to
*
@ -223,100 +231,50 @@ public:
P *peers,
uint64_t now)
{
P toConsider[ZT_MULTICAST_PICK_MAX_SAMPLE_SIZE];
unsigned int sampleSize = 0;
typename std::set< P,_PeerPropagationPrioritySortOrder<P> > toConsider;
// Decay a few random bits in bloom filter to probabilistically eliminate
// false positives as we go. The odds of decaying an already-set bit
// increases as the bloom filter saturates, so in the early hops of
// propagation this likely won't have any effect. This allows peers with
// bloom filter collisions to be reconsidered, but at positions on the
// network graph likely to be hops away from the original origin of the
// Pick up to ZT_MULTICAST_PICK_MAX_SAMPLE_SIZE peers that have
// subscribed to this channel and that are not in bloom filter.
// Pick randomly from subscribers, but place into a set that is
// sorted in descending order of time of most recent unicast
// frame transfer. (Implicit social ordering.) Also ignore original
// submitter and upstream, since we know these have seen this
// message.
for(unsigned int i=0;i<ZT_MULTICAST_BLOOM_FILTER_DECAY_RATE;++i)
bf.decay((unsigned int)prng.next32());
{
Mutex::Lock _l(_multicastMemberships_m);
// Sample a random subset of peers that we know have LIKEd this multicast
// group on this network.
std::map< MulticastChannel,std::map<Address,uint64_t> >::iterator channelMembers(_multicastMemberships.find(MulticastChannel(nwid,mg)));
if ((channelMembers != _multicastMemberships.end())&&(!channelMembers->second.empty())) {
unsigned long numEntriesPermittedToSkip = (channelMembers->second.size() > ZT_MULTICAST_PICK_MAX_SAMPLE_SIZE) ? (unsigned long)(channelMembers->second.size() - ZT_MULTICAST_PICK_MAX_SAMPLE_SIZE) : (unsigned long)0;
double skipWhatFraction = (double)numEntriesPermittedToSkip / (double)channelMembers->second.size();
std::map<Address,uint64_t>::iterator channelMemberEntry(channelMembers->second.begin());
while (channelMemberEntry != channelMembers->second.end()) {
// Auto-clean the channel members map if their LIKEs are expired. This will
// technically skew the random distribution of chosen members just a little, but
// it's unlikely that enough will expire in any single pick to make much of a
// difference overall.
if ((now - channelMemberEntry->second) > ZT_MULTICAST_LIKE_EXPIRE) {
channelMembers->second.erase(channelMemberEntry++);
continue;
std::map< MulticastChannel,std::vector<MulticastMembership> >::iterator mm(_multicastMemberships.find(MulticastChannel(nwid,mg)));
if ((mm != _multicastMemberships.end())&&(!mm->second.empty())) {
for(unsigned int stries=0;stries<ZT_MULTICAST_PICK_MAX_SAMPLE_SIZE;++stries) {
MulticastMembership &m = mm->second[prng.next32() % mm->second.size()];
if (((now - m.second) < ZT_MULTICAST_LIKE_EXPIRE)&&(!bf.contains(m.first.sum()))&&(m.first != originalSubmitter)&&(m.first != upstream)) {
P peer(topology.getPeer(m.first));
if (peer)
toConsider.insert(peer);
}
// Skip some fraction of entries so that our sampling will be randomly distributed,
// since there is no other good way to sample randomly from a map.
if (numEntriesPermittedToSkip) {
if (prng.nextDouble() <= skipWhatFraction) {
--numEntriesPermittedToSkip;
++channelMemberEntry;
continue;
}
}
// If it's not expired and it's from our random sample, add it to the set of peers
// to consider. Exclude immediate upstream and original submitter, since we know for
// a fact they've already seen this. Also exclude things in the bloom filter.
if ((channelMemberEntry->first != originalSubmitter)&&(channelMemberEntry->first != upstream)) {
if (!bf.contains(channelMemberEntry->first.sum())) {
P peer = topology.getPeer(channelMemberEntry->first);
if ((peer)&&(peer->hasActiveDirectPath(now))) {
toConsider[sampleSize++] = peer;
if (sampleSize >= ZT_MULTICAST_PICK_MAX_SAMPLE_SIZE)
break; // abort if we have enough candidates
}
}
}
++channelMemberEntry;
}
// Auto-clean: erase whole map if there are no more LIKEs for this channel
if (channelMembers->second.empty())
_multicastMemberships.erase(channelMembers);
}
}
// Sort in descending order of most recent direct unicast frame, picking
// peers with whom we have recently communicated. This is "implicit social
// switching."
std::sort(toConsider,toConsider + sampleSize,PeerPropagationPrioritySortOrder<P>());
// The first peers in toConsider will be the 'best'
unsigned int chosen = 0;
for(typename std::set< P,_PeerPropagationPrioritySortOrder<P> >::iterator i(toConsider.begin());((i!=toConsider.end())&&(chosen < max));++i)
bf.set((peers[chosen++] = *i)->address().sum());
// Pick the best N peers
unsigned int picked = 0;
for(unsigned int i=0;((i<sampleSize)&&(picked < max));++i) {
peers[picked++] = toConsider[i];
bf.set(toConsider[i]->address().sum());
}
// Add a supernode if there's nowhere else to go. Supernodes know of all multicast
// LIKEs and so can act to bridge sparse multicast groups.
if (!picked) {
// Add a supernode if there are fewer than the desired
// number of recipients.
if (chosen < max) {
P peer = topology.getBestSupernode(&originalSubmitter,1,true);
if (peer)
peers[picked++] = peer;
peers[chosen++] = peer;
}
return picked;
return chosen;
}
private:
// Sort order for chosen propagation peers
template<typename P>
struct PeerPropagationPrioritySortOrder
struct _PeerPropagationPrioritySortOrder
{
inline bool operator()(const P &p1,const P &p2) const
{
@ -355,8 +313,10 @@ private:
// network ID and a multicast group within that network.
typedef std::pair<uint64_t,MulticastGroup> MulticastChannel;
// Address and time of last LIKE, by network ID and multicast group
std::map< MulticastChannel,std::map<Address,uint64_t> > _multicastMemberships;
// Address and time of last LIKE
typedef std::pair<Address,uint64_t> MulticastMembership;
std::map< MulticastChannel,std::vector<MulticastMembership> > _multicastMemberships;
Mutex _multicastMemberships_m;
};