Make MulticastTopology have its own mutex.

This commit is contained in:
Adam Ierymenko 2014-09-23 10:26:30 -07:00
parent c49e253e21
commit 61d0f27d2a
4 changed files with 46 additions and 38 deletions

View File

@ -43,6 +43,7 @@ MulticastTopology::~MulticastTopology()
void MulticastTopology::add(const MulticastGroup &mg,const Address &member,const Address &learnedFrom) void MulticastTopology::add(const MulticastGroup &mg,const Address &member,const Address &learnedFrom)
{ {
Mutex::Lock _l(_groups_m);
std::vector<MulticastGroupMember> &mv = _groups[mg].members; std::vector<MulticastGroupMember> &mv = _groups[mg].members;
for(std::vector<MulticastGroupMember>::iterator m(mv.begin());m!=mv.end();++m) { for(std::vector<MulticastGroupMember>::iterator m(mv.begin());m!=mv.end();++m) {
if (m->address == member) { if (m->address == member) {
@ -57,6 +58,7 @@ void MulticastTopology::add(const MulticastGroup &mg,const Address &member,const
void MulticastTopology::erase(const MulticastGroup &mg,const Address &member) void MulticastTopology::erase(const MulticastGroup &mg,const Address &member)
{ {
Mutex::Lock _l(_groups_m);
std::map< MulticastGroup,MulticastGroupStatus >::iterator r(_groups.find(mg)); std::map< MulticastGroup,MulticastGroupStatus >::iterator r(_groups.find(mg));
if (r != _groups.end()) { if (r != _groups.end()) {
for(std::vector<MulticastGroupMember>::iterator m(r->second.members.begin());m!=r->second.members.end();++m) { for(std::vector<MulticastGroupMember>::iterator m(r->second.members.begin());m!=r->second.members.end();++m) {
@ -72,6 +74,7 @@ void MulticastTopology::erase(const MulticastGroup &mg,const Address &member)
unsigned int MulticastTopology::want(const MulticastGroup &mg,uint64_t now,unsigned int limit,bool updateLastGatheredTimeOnNonzeroReturn) unsigned int MulticastTopology::want(const MulticastGroup &mg,uint64_t now,unsigned int limit,bool updateLastGatheredTimeOnNonzeroReturn)
{ {
Mutex::Lock _l(_groups_m);
MulticastGroupStatus &gs = _groups[mg]; MulticastGroupStatus &gs = _groups[mg];
if ((unsigned int)gs.members.size() >= limit) { if ((unsigned int)gs.members.size() >= limit) {
// We already caught our limit, don't need to go fishing any more. // We already caught our limit, don't need to go fishing any more.
@ -90,6 +93,7 @@ unsigned int MulticastTopology::want(const MulticastGroup &mg,uint64_t now,unsig
void MulticastTopology::clean(uint64_t now,const Topology &topology) void MulticastTopology::clean(uint64_t now,const Topology &topology)
{ {
Mutex::Lock _l(_groups_m);
for(std::map< MulticastGroup,MulticastGroupStatus >::iterator mm(_groups.begin());mm!=_groups.end();) { for(std::map< MulticastGroup,MulticastGroupStatus >::iterator mm(_groups.begin());mm!=_groups.end();) {
std::vector<MulticastGroupMember>::iterator reader(mm->second.members.begin()); std::vector<MulticastGroupMember>::iterator reader(mm->second.members.begin());
std::vector<MulticastGroupMember>::iterator writer(mm->second.members.begin()); std::vector<MulticastGroupMember>::iterator writer(mm->second.members.begin());
@ -108,7 +112,7 @@ void MulticastTopology::clean(uint64_t now,const Topology &topology)
SharedPtr<Peer> p(topology.getPeer(writer->learnedFrom)); SharedPtr<Peer> p(topology.getPeer(writer->learnedFrom));
if (p) if (p)
writer->rank = p->lastUnicastFrame() - ZT_MULTICAST_LIKE_EXPIRE; writer->rank = p->lastUnicastFrame() - ZT_MULTICAST_LIKE_EXPIRE;
else writer->rank = writer->timestamp - 86400000; else writer->rank = writer->timestamp - (86400000 + ZT_MULTICAST_LIKE_EXPIRE);
} else { } else {
SharedPtr<Peer> p(topology.getPeer(writer->address)); SharedPtr<Peer> p(topology.getPeer(writer->address));
if (p) if (p)

View File

@ -38,6 +38,7 @@
#include "Address.hpp" #include "Address.hpp"
#include "MulticastGroup.hpp" #include "MulticastGroup.hpp"
#include "Utils.hpp" #include "Utils.hpp"
#include "Mutex.hpp"
namespace ZeroTier { namespace ZeroTier {
@ -45,8 +46,6 @@ class Topology;
/** /**
* Database of known multicast peers within a network * Database of known multicast peers within a network
*
* This structure is not guarded by a mutex; the caller must synchronize access.
*/ */
class MulticastTopology class MulticastTopology
{ {
@ -100,6 +99,7 @@ public:
*/ */
inline std::pair<uint64_t,unsigned int> groupStatus(const MulticastGroup &mg) const inline std::pair<uint64_t,unsigned int> groupStatus(const MulticastGroup &mg) const
{ {
Mutex::Lock _l(_groups_m);
std::map< MulticastGroup,MulticastGroupStatus >::const_iterator r(_groups.find(mg)); std::map< MulticastGroup,MulticastGroupStatus >::const_iterator r(_groups.find(mg));
return ((r != _groups.end()) ? std::pair<uint64_t,unsigned int>(r->second.lastGatheredMembers,r->second.members.size()) : std::pair<uint64_t,unsigned int>(0,0)); return ((r != _groups.end()) ? std::pair<uint64_t,unsigned int>(r->second.lastGatheredMembers,r->second.members.size()) : std::pair<uint64_t,unsigned int>(0,0));
} }
@ -122,6 +122,7 @@ public:
*/ */
inline void gatheringMembersNow(const MulticastGroup &mg,uint64_t now) inline void gatheringMembersNow(const MulticastGroup &mg,uint64_t now)
{ {
Mutex::Lock _l(_groups_m);
_groups[mg].lastGatheredMembers = now; _groups[mg].lastGatheredMembers = now;
} }
@ -136,6 +137,7 @@ public:
private: private:
std::map< MulticastGroup,MulticastGroupStatus > _groups; std::map< MulticastGroup,MulticastGroupStatus > _groups;
Mutex _groups_m;
}; };
} // namespace ZeroTier } // namespace ZeroTier

View File

@ -280,6 +280,7 @@ bool Network::isAllowed(const Address &peer) const
} }
void Network::clean() void Network::clean()
{
{ {
Mutex::Lock _l(_lock); Mutex::Lock _l(_lock);
@ -316,9 +317,12 @@ void Network::clean()
_multicastGroupsBehindMe.erase(mg++); _multicastGroupsBehindMe.erase(mg++);
else ++mg; else ++mg;
} }
}
{
_multicastTopology.clean(now,*(_r->topology),(_config) ? _config->multicastLimit() : (unsigned int)ZT_DEFAULT_MULTICAST_LIMIT); _multicastTopology.clean(now,*(_r->topology),(_config) ? _config->multicastLimit() : (unsigned int)ZT_DEFAULT_MULTICAST_LIMIT);
} }
}
Network::Status Network::status() const Network::Status Network::status() const
{ {

View File

@ -249,23 +249,21 @@ public:
Status status() const; Status status() const;
/** /**
* Update multicast balance for an address and multicast group, return whether packet is allowed * Update and check multicast rate balance for a group
* *
* @param a Originating address of multicast packet
* @param mg Multicast group * @param mg Multicast group
* @param bytes Size of packet * @param bytes Size of packet
* @return True if packet is within budget * @return True if packet is within budget
*/ */
inline bool updateAndCheckMulticastBalance(const Address &a,const MulticastGroup &mg,unsigned int bytes) inline bool updateAndCheckMulticastBalance(const MulticastGroup &mg,unsigned int bytes)
{ {
Mutex::Lock _l(_lock); Mutex::Lock _l(_lock);
if (!_config) if (!_config)
return false; return false;
std::pair<Address,MulticastGroup> k(a,mg); std::map< MulticastGroup,BandwidthAccount >::iterator bal(_multicastRateAccounts.find(mg));
std::map< std::pair<Address,MulticastGroup>,BandwidthAccount >::iterator bal(_multicastRateAccounts.find(k));
if (bal == _multicastRateAccounts.end()) { if (bal == _multicastRateAccounts.end()) {
NetworkConfig::MulticastRate r(_config->multicastRate(mg)); NetworkConfig::MulticastRate r(_config->multicastRate(mg));
bal = _multicastRateAccounts.insert(std::pair< std::pair<Address,MulticastGroup>,BandwidthAccount >(k,BandwidthAccount(r.preload,r.maxBalance,r.accrual))).first; bal = _multicastRateAccounts.insert(std::pair< MulticastGroup,BandwidthAccount >(mg,BandwidthAccount(r.preload,r.maxBalance,r.accrual))).first;
} }
return bal->second.deduct(bytes); return bal->second.deduct(bytes);
} }
@ -348,7 +346,7 @@ public:
} }
/** /**
* @return Set of currently assigned IP addresses * @return Set of IPs currently assigned to interface
*/ */
inline std::set<InetAddress> ips() const inline std::set<InetAddress> ips() const
{ {