diff --git a/node/Cluster.cpp b/node/Cluster.cpp index e6fc18eca..18ea9eb4a 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -161,8 +161,6 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) return; } - _Member &m = _members[fromMemberId]; - try { while (ptr < dmsg.size()) { const unsigned int mlen = dmsg.at(ptr); ptr += 2; @@ -177,6 +175,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) break; case CLUSTER_MESSAGE_ALIVE: { + _Member &m = _members[fromMemberId]; Mutex::Lock mlck(m.lock); ptr += 7; // skip version stuff, not used yet m.x = dmsg.at(ptr); ptr += 4; @@ -253,7 +252,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) peer->identity().serialize(buf); Mutex::Lock _l2(_members[fromMemberId].lock); _send(fromMemberId,CLUSTER_MESSAGE_HAVE_PEER,buf.data(),buf.size()); - _flush(fromMemberId); // lookups are latency sensitive + _flush(fromMemberId); } } break; @@ -340,8 +339,11 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) } if (haveMatch) { - _send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size()); - _flush(fromMemberId); + { + Mutex::Lock _l2(_members[fromMemberId].lock); + _send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size()); + _flush(fromMemberId); + } RR->sw->send(rendezvousForLocal,true,0); } } @@ -382,7 +384,6 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee return; const uint64_t now = RR->node->now(); - unsigned int canHasPeer = 0; uint64_t mostRecentTs = 0; unsigned int mostRecentMemberId = 0xffffffff; @@ -413,7 +414,7 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee Mutex::Lock _l2(_members[*mid].lock); _send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH); if ((enqueueAndWait)&&(queueCount == 0)) - _flush(*mid); // send first query immediately to reduce latency + _flush(*mid); } } @@ -453,14 +454,14 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee { Mutex::Lock _l2(_members[mostRecentMemberId].lock); if (buf.size() > 0) { - _send(canHasPeer,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size()); - _flush(canHasPeer); // latency sensitive + _send(mostRecentMemberId,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size()); + _flush(mostRecentMemberId); } if (_members[mostRecentMemberId].zeroTierPhysicalEndpoints.size() > 0) - RR->node->putPacket(InetAddress(),_members[canHasPeer].zeroTierPhysicalEndpoints.front(),data,len); + RR->node->putPacket(InetAddress(),_members[mostRecentMemberId].zeroTierPhysicalEndpoints.front(),data,len); } - TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer); + TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId); } void Cluster::sendDistributedQuery(const Packet &pkt) @@ -472,7 +473,7 @@ void Cluster::sendDistributedQuery(const Packet &pkt) for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { Mutex::Lock _l2(_members[*mid].lock); _send(*mid,CLUSTER_MESSAGE_REMOTE_PACKET,buf.data(),buf.size()); - _flush(*mid); // these tend to be latency-sensitive + _flush(*mid); } } @@ -524,7 +525,7 @@ void Cluster::doPeriodicTasks() _send(*mid,CLUSTER_MESSAGE_ALIVE,alive.data(),alive.size()); } - _flush(*mid); // does nothing if nothing to flush + _flush(*mid); } } @@ -740,6 +741,53 @@ void Cluster::_flush(uint16_t memberId) } } +void Cluster::_doREMOTE_WHOIS(uint64_t fromMemberId,const Packet &remotep) +{ + if (remotep.payloadLength() >= ZT_ADDRESS_LENGTH) { + Identity queried(RR->topology->getIdentity(Address(remotep.payload(),ZT_ADDRESS_LENGTH))); + if (queried) { + Buffer<1024> routp; + remotep.source().appendTo(routp); + routp.append((uint8_t)Packet::VERB_OK); + routp.append((uint8_t)Packet::VERB_WHOIS); + routp.append(remotep.packetId()); + queried.serialize(routp); + + Mutex::Lock _l2(_members[fromMemberId].lock); + _send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,routp.data(),routp.size()); + _flush(fromMemberId); + } + } +} + +void Cluster::_doREMOTE_MULTICAST_GATHER(uint64_t fromMemberId,const Packet &remotep) +{ + const uint64_t nwid = remotep.at(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_NETWORK_ID); + const MulticastGroup mg(MAC(remotep.field(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_MAC,6),6),remotep.at(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_ADI)); + unsigned int gatherLimit = remotep.at(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_GATHER_LIMIT); + const Address remotePeerAddress(remotep.source()); + + //TRACE("< routp; + remotePeerAddress.appendTo(routp); + routp.append((uint8_t)Packet::VERB_OK); + routp.append((uint8_t)Packet::VERB_MULTICAST_GATHER); + routp.append(remotep.packetId()); + routp.append(nwid); + mg.mac().appendTo(routp); + routp.append((uint32_t)mg.adi()); + + if (gatherLimit > ((ZT_CLUSTER_MAX_MESSAGE_LENGTH - 64) / 5)) + gatherLimit = ((ZT_CLUSTER_MAX_MESSAGE_LENGTH - 64) / 5); + if (RR->mc->gather(remotePeerAddress,nwid,mg,routp,gatherLimit)) { + Mutex::Lock _l2(_members[fromMemberId].lock); + _send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,routp.data(),routp.size()); + } + } +} + } // namespace ZeroTier #endif // ZT_ENABLE_CLUSTER diff --git a/node/Multicaster.cpp b/node/Multicaster.cpp index 01e6b7999..41838552d 100644 --- a/node/Multicaster.cpp +++ b/node/Multicaster.cpp @@ -78,7 +78,7 @@ void Multicaster::remove(uint64_t nwid,const MulticastGroup &mg,const Address &m } } -unsigned int Multicaster::gather(const Address &queryingPeer,uint64_t nwid,const MulticastGroup &mg,Packet &appendTo,unsigned int limit) const +unsigned int Multicaster::gather(const Address &queryingPeer,uint64_t nwid,const MulticastGroup &mg,Buffer &appendTo,unsigned int limit) const { unsigned char *p; unsigned int added = 0,i,k,rptr,totalKnown = 0; diff --git a/node/Multicaster.hpp b/node/Multicaster.hpp index 898c4db72..8e6a7556c 100644 --- a/node/Multicaster.hpp +++ b/node/Multicaster.hpp @@ -146,7 +146,7 @@ public: * @return Number of addresses appended * @throws std::out_of_range Buffer overflow writing to packet */ - unsigned int gather(const Address &queryingPeer,uint64_t nwid,const MulticastGroup &mg,Packet &appendTo,unsigned int limit) const; + unsigned int gather(const Address &queryingPeer,uint64_t nwid,const MulticastGroup &mg,Buffer &appendTo,unsigned int limit) const; /** * Get subscribers to a multicast group