diff --git a/node/Cluster.cpp b/node/Cluster.cpp index c40fbd55e..e6fc18eca 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -83,9 +83,6 @@ Cluster::Cluster( _id(id), _zeroTierPhysicalEndpoints(zeroTierPhysicalEndpoints), _members(new _Member[ZT_CLUSTER_MAX_MEMBERS]), - _peerAffinities(65536), - _lastCleanedPeerAffinities(0), - _lastCheckedPeersForAnnounce(0), _lastFlushed(0), _lastCleanedRemotePeers(0) { @@ -231,7 +228,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) unsigned int qc = 0; { Mutex::Lock _l(_sendViaClusterQueue_m); - std::pair< std::multimap::iterator,std::multimap::iterator > er(_sendViaClusterQueue.equal_range(id.address())); + std::pair< std::multimap::iterator,std::multimap::iterator > er(_sendViaClusterQueue.equal_range(id.address())); for(std::multimap::iterator qi(er.first);qi!=er.second;) { if (qc >= ZT_CLUSTER_MAX_QUEUE_PER_SENDER) // sanity check break; @@ -252,12 +249,11 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) const Address zeroTierAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; SharedPtr peer(RR->topology->getPeerNoCache(zeroTierAddress)); if ( (peer) && (peer->hasActiveDirectPath(RR->node->now())) ) { - Buffer<1024> buf; - peer->identity().serialize(buf); - Mutex::Lock _l2(_members[fromMemberId].lock); - _send(fromMemberId,CLUSTER_MESSAGE_HAVE_PEER,buf.data(),buf.size()); - _flush(fromMemberId); // lookups are latency sensitive - } + Buffer<1024> buf; + peer->identity().serialize(buf); + Mutex::Lock _l2(_members[fromMemberId].lock); + _send(fromMemberId,CLUSTER_MESSAGE_HAVE_PEER,buf.data(),buf.size()); + _flush(fromMemberId); // lookups are latency sensitive } } break; @@ -374,10 +370,16 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) } } -bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite) +void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite) { if (len > ZT_PROTO_MAX_PACKET_LENGTH) // sanity check - return false; + return; + + _sendViaClusterQueue_m.lock(); + const unsigned long queueCount = _sendViaClusterQueue.count(fromPeerAddress); + _sendViaClusterQueue_m.unlock(); + if (queueCount > ZT_CLUSTER_MAX_QUEUE_PER_SENDER) + return; const uint64_t now = RR->node->now(); unsigned int canHasPeer = 0; @@ -399,6 +401,8 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee const uint64_t age = now - mostRecentTs; if (age >= (ZT_PEER_ACTIVITY_TIMEOUT / 3)) { + const bool enqueueAndWait = ((age >= ZT_PEER_ACTIVITY_TIMEOUT)||(mostRecentMemberId > 0xffff)); + // Poll everyone with WANT_PEER if the age of our most recent entry is // approaching expiration (or has expired, or does not exist). char tmp[ZT_ADDRESS_LENGTH]; @@ -408,18 +412,17 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { Mutex::Lock _l2(_members[*mid].lock); _send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH); - if (mostRecentMemberId > 0xffff) - _flush(*mid); // latency sensitive if we don't have one + if ((enqueueAndWait)&&(queueCount == 0)) + _flush(*mid); // send first query immediately to reduce latency } } // If there isn't a good place to send via, then enqueue this for retrying // later and return after having broadcasted a WANT_PEER. - if ((age >= ZT_PEER_ACTIVITY_TIMEOUT)||(mostRecentMemberId > 0xffff)) { + if (enqueueAndWait) { Mutex::Lock _l(_sendViaClusterQueue_m); - if (_sendViaClusterQueue.count(fromPeerAddress) < ZT_CLUSTER_MAX_QUEUE_PER_SENDER) - _sendViaClusterQueue.insert(std::pair(fromPeerAddress,new _SQE(now,toPeerAddress,data,len,unite))); - return true; + _sendViaClusterQueue.insert(std::pair(fromPeerAddress,new _SQE(now,toPeerAddress,data,len,unite))); + return; } } @@ -458,8 +461,6 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee } TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer); - - return true; } void Cluster::sendDistributedQuery(const Packet &pkt) @@ -643,18 +644,20 @@ void Cluster::status(ZT_ClusterStatus &status) const status.myId = _id; - ms[_id] = &(status.members[status.clusterSize++]); - ms[_id]->id = _id; - ms[_id]->alive = 1; - ms[_id]->x = _x; - ms[_id]->y = _y; - ms[_id]->z = _z; - ms[_id]->load = 0; // TODO - ms[_id]->peers = RR->topology->countActive(); - for(std::vector::const_iterator ep(_zeroTierPhysicalEndpoints.begin());ep!=_zeroTierPhysicalEndpoints.end();++ep) { - if (ms[_id]->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check - break; - memcpy(&(ms[_id]->zeroTierPhysicalEndpoints[ms[_id]->numZeroTierPhysicalEndpoints++]),&(*ep),sizeof(struct sockaddr_storage)); + { + ZT_ClusterMemberStatus *const s = &(status.members[status.clusterSize++]); + s->id = _id; + s->alive = 1; + s->x = _x; + s->y = _y; + s->z = _z; + s->load = 0; // TODO + s->peers = RR->topology->countActive(); + for(std::vector::const_iterator ep(_zeroTierPhysicalEndpoints.begin());ep!=_zeroTierPhysicalEndpoints.end();++ep) { + if (s->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check + break; + memcpy(&(s->zeroTierPhysicalEndpoints[s->numZeroTierPhysicalEndpoints++]),&(*ep),sizeof(struct sockaddr_storage)); + } } { diff --git a/node/Cluster.hpp b/node/Cluster.hpp index 957e84406..e45b2d82f 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -45,6 +45,7 @@ #include "Mutex.hpp" #include "SharedPtr.hpp" #include "Hashtable.hpp" +#include "Packet.hpp" /** * Timeout for cluster members being considered "alive" @@ -77,7 +78,6 @@ namespace ZeroTier { class RuntimeEnvironment; -class CertificateOfMembership; class MulticastGroup; class Peer; class Identity;