Refactor multicast group announcement to work directly or indirectly.

This commit is contained in:
Adam Ierymenko 2015-10-23 14:50:07 -07:00
parent e9648a6cdf
commit 35676217e8
3 changed files with 73 additions and 58 deletions

View File

@ -449,11 +449,12 @@ void Cluster::replicateHavePeer(const Identity &peerId)
void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,const MulticastGroup &group) void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,const MulticastGroup &group)
{ {
Buffer<4096> buf; Buffer<2048> buf;
buf.append((uint64_t)nwid); buf.append((uint64_t)nwid);
peerAddress.appendTo(buf); peerAddress.appendTo(buf);
group.mac().appendTo(buf); group.mac().appendTo(buf);
buf.append((uint32_t)group.adi()); buf.append((uint32_t)group.adi());
TRACE("replicating %s MULTICAST_LIKE %.16llx/%s/%u to all members",peerAddress.toString().c_str(),nwid,group.mac().toString().c_str(),(unsigned int)group.adi());
{ {
Mutex::Lock _l(_memberIds_m); Mutex::Lock _l(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
@ -465,8 +466,9 @@ void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,co
void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembership &com) void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembership &com)
{ {
Buffer<4096> buf; Buffer<2048> buf;
com.serialize(buf); com.serialize(buf);
TRACE("replicating %s COM for %.16llx to all members",com.issuedTo().toString().c_str(),com.networkId());
{ {
Mutex::Lock _l(_memberIds_m); Mutex::Lock _l(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {

View File

@ -144,7 +144,15 @@ void Network::multicastUnsubscribe(const MulticastGroup &mg)
bool Network::tryAnnounceMulticastGroupsTo(const SharedPtr<Peer> &peer) bool Network::tryAnnounceMulticastGroupsTo(const SharedPtr<Peer> &peer)
{ {
Mutex::Lock _l(_lock); Mutex::Lock _l(_lock);
return _tryAnnounceMulticastGroupsTo(RR->topology->rootAddresses(),_allMulticastGroups(),peer,RR->node->now()); if (
(_isAllowed(peer)) ||
(peer->address() == this->controller()) ||
(RR->topology->isRoot(peer->identity()))
) {
_announceMulticastGroupsTo(peer->address(),_allMulticastGroups());
return true;
}
return false;
} }
bool Network::applyConfiguration(const SharedPtr<NetworkConfig> &conf) bool Network::applyConfiguration(const SharedPtr<NetworkConfig> &conf)
@ -400,77 +408,80 @@ bool Network::_isAllowed(const SharedPtr<Peer> &peer) const
return false; // default position on any failure return false; // default position on any failure
} }
bool Network::_tryAnnounceMulticastGroupsTo(const std::vector<Address> &alwaysAddresses,const std::vector<MulticastGroup> &allMulticastGroups,const SharedPtr<Peer> &peer,uint64_t now) const class _GetPeersThatNeedMulticastAnnouncement
{
// assumes _lock is locked
if (
(_isAllowed(peer)) ||
(peer->address() == this->controller()) ||
(std::find(alwaysAddresses.begin(),alwaysAddresses.end(),peer->address()) != alwaysAddresses.end())
) {
if ((_config)&&(_config->com())&&(!_config->isPublic())&&(peer->needsOurNetworkMembershipCertificate(_id,now,true))) {
Packet outp(peer->address(),RR->identity.address(),Packet::VERB_NETWORK_MEMBERSHIP_CERTIFICATE);
_config->com().serialize(outp);
outp.armor(peer->key(),true);
peer->send(RR,outp.data(),outp.size(),now);
}
{
Packet outp(peer->address(),RR->identity.address(),Packet::VERB_MULTICAST_LIKE);
for(std::vector<MulticastGroup>::const_iterator mg(allMulticastGroups.begin());mg!=allMulticastGroups.end();++mg) {
if ((outp.size() + 18) >= ZT_UDP_DEFAULT_PAYLOAD_MTU) {
outp.armor(peer->key(),true);
peer->send(RR,outp.data(),outp.size(),now);
outp.reset(peer->address(),RR->identity.address(),Packet::VERB_MULTICAST_LIKE);
}
// network ID, MAC, ADI
outp.append((uint64_t)_id);
mg->mac().appendTo(outp);
outp.append((uint32_t)mg->adi());
}
if (outp.size() > ZT_PROTO_MIN_PACKET_LENGTH) {
outp.armor(peer->key(),true);
peer->send(RR,outp.data(),outp.size(),now);
}
}
return true;
}
return false;
}
class _AnnounceMulticastGroupsToAll
{ {
public: public:
_AnnounceMulticastGroupsToAll(const RuntimeEnvironment *renv,Network *nw) : _GetPeersThatNeedMulticastAnnouncement(const RuntimeEnvironment *renv,Network *nw) :
_now(renv->node->now()), _now(renv->node->now()),
_controller(nw->controller()),
_network(nw), _network(nw),
_rootAddresses(renv->topology->rootAddresses()), _rootAddresses(renv->topology->rootAddresses())
_allMulticastGroups(nw->_allMulticastGroups())
{} {}
inline void operator()(Topology &t,const SharedPtr<Peer> &p)
inline void operator()(Topology &t,const SharedPtr<Peer> &p) { _network->_tryAnnounceMulticastGroupsTo(_rootAddresses,_allMulticastGroups,p,_now); } {
if (
(_network->_isAllowed(p)) ||
(p->address() == _controller) ||
(std::find(_rootAddresses.begin(),_rootAddresses.end(),p->address()) != _rootAddresses.end())
) {
peers.push_back(p->address());
}
}
std::vector<Address> peers;
private: private:
uint64_t _now; uint64_t _now;
Address _controller;
Network *_network; Network *_network;
std::vector<Address> _rootAddresses; std::vector<Address> _rootAddresses;
std::vector<MulticastGroup> _allMulticastGroups;
}; };
void Network::_announceMulticastGroups() void Network::_announceMulticastGroups()
{ {
// Assumes _lock is locked // Assumes _lock is locked
_AnnounceMulticastGroupsToAll afunc(RR,this);
RR->topology->eachPeer<_AnnounceMulticastGroupsToAll &>(afunc); _GetPeersThatNeedMulticastAnnouncement gpfunc(RR,this);
RR->topology->eachPeer<_GetPeersThatNeedMulticastAnnouncement &>(gpfunc);
std::vector<MulticastGroup> allMulticastGroups(_allMulticastGroups());
for(std::vector<Address>::const_iterator pa(gpfunc.peers.begin());pa!=gpfunc.peers.end();++pa)
_announceMulticastGroupsTo(*pa,allMulticastGroups);
}
void Network::_announceMulticastGroupsTo(const Address &peerAddress,const std::vector<MulticastGroup> &allMulticastGroups) const
{
// Assumes _lock is locked
// We push COMs ahead of MULTICAST_LIKE since they're used for access control -- a COM is a public
// credential so "over-sharing" isn't really an issue (and we only do so with roots).
if ((_config)&&(_config->com())&&(!_config->isPublic())) {
Packet outp(peerAddress,RR->identity.address(),Packet::VERB_NETWORK_MEMBERSHIP_CERTIFICATE);
_config->com().serialize(outp);
RR->sw->send(outp,true,0);
}
{
Packet outp(peerAddress,RR->identity.address(),Packet::VERB_MULTICAST_LIKE);
for(std::vector<MulticastGroup>::const_iterator mg(allMulticastGroups.begin());mg!=allMulticastGroups.end();++mg) {
if ((outp.size() + 18) >= ZT_UDP_DEFAULT_PAYLOAD_MTU) {
RR->sw->send(outp,true,0);
outp.reset(peerAddress,RR->identity.address(),Packet::VERB_MULTICAST_LIKE);
}
// network ID, MAC, ADI
outp.append((uint64_t)_id);
mg->mac().appendTo(outp);
outp.append((uint32_t)mg->adi());
}
if (outp.size() > ZT_PROTO_MIN_PACKET_LENGTH)
RR->sw->send(outp,true,0);
}
} }
std::vector<MulticastGroup> Network::_allMulticastGroups() const std::vector<MulticastGroup> Network::_allMulticastGroups() const
{ {
// Assumes _lock is locked // Assumes _lock is locked
std::vector<MulticastGroup> mgs; std::vector<MulticastGroup> mgs;
mgs.reserve(_myMulticastGroups.size() + _multicastGroupsBehindMe.size() + 1); mgs.reserve(_myMulticastGroups.size() + _multicastGroupsBehindMe.size() + 1);
mgs.insert(mgs.end(),_myMulticastGroups.begin(),_myMulticastGroups.end()); mgs.insert(mgs.end(),_myMulticastGroups.begin(),_myMulticastGroups.end());
@ -479,6 +490,7 @@ std::vector<MulticastGroup> Network::_allMulticastGroups() const
mgs.push_back(Network::BROADCAST); mgs.push_back(Network::BROADCAST);
std::sort(mgs.begin(),mgs.end()); std::sort(mgs.begin(),mgs.end());
mgs.erase(std::unique(mgs.begin(),mgs.end()),mgs.end()); mgs.erase(std::unique(mgs.begin(),mgs.end()),mgs.end());
return mgs; return mgs;
} }

View File

@ -56,7 +56,7 @@ namespace ZeroTier {
class RuntimeEnvironment; class RuntimeEnvironment;
class Peer; class Peer;
class _AnnounceMulticastGroupsToAll; // internal function object in Network.cpp class _GetPeersThatNeedMulticastAnnouncement;
/** /**
* A virtual LAN * A virtual LAN
@ -64,7 +64,7 @@ class _AnnounceMulticastGroupsToAll; // internal function object in Network.cpp
class Network : NonCopyable class Network : NonCopyable
{ {
friend class SharedPtr<Network>; friend class SharedPtr<Network>;
friend class _AnnounceMulticastGroupsToAll; friend class _GetPeersThatNeedMulticastAnnouncement; // internal function object
public: public:
/** /**
@ -344,6 +344,7 @@ private:
bool _isAllowed(const SharedPtr<Peer> &peer) const; bool _isAllowed(const SharedPtr<Peer> &peer) const;
bool _tryAnnounceMulticastGroupsTo(const std::vector<Address> &rootAddresses,const std::vector<MulticastGroup> &allMulticastGroups,const SharedPtr<Peer> &peer,uint64_t now) const; bool _tryAnnounceMulticastGroupsTo(const std::vector<Address> &rootAddresses,const std::vector<MulticastGroup> &allMulticastGroups,const SharedPtr<Peer> &peer,uint64_t now) const;
void _announceMulticastGroups(); void _announceMulticastGroups();
void _announceMulticastGroupsTo(const Address &peerAddress,const std::vector<MulticastGroup> &allMulticastGroups) const;
std::vector<MulticastGroup> _allMulticastGroups() const; std::vector<MulticastGroup> _allMulticastGroups() const;
const RuntimeEnvironment *RR; const RuntimeEnvironment *RR;