From 57b71bfff0dc50505495e64b75988227d4b85905 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Sun, 8 Nov 2015 13:57:02 -0800 Subject: [PATCH] Cluster simplification and refactor work in progress... --- node/Cluster.cpp | 501 ++++++++++++++++++++++------------------ node/Cluster.hpp | 119 ++++++---- node/IncomingPacket.cpp | 47 ++-- node/Packet.hpp | 25 +- node/Peer.cpp | 14 +- 5 files changed, 371 insertions(+), 335 deletions(-) diff --git a/node/Cluster.cpp b/node/Cluster.cpp index 9c954fa3f..c40fbd55e 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -86,7 +86,8 @@ Cluster::Cluster( _peerAffinities(65536), _lastCleanedPeerAffinities(0), _lastCheckedPeersForAnnounce(0), - _lastFlushed(0) + _lastFlushed(0), + _lastCleanedRemotePeers(0) { uint16_t stmp[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)]; @@ -107,6 +108,9 @@ Cluster::~Cluster() Utils::burn(_masterSecret,sizeof(_masterSecret)); Utils::burn(_key,sizeof(_key)); delete [] _members; + + for(std::multimap::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) + delete qi->second; } void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) @@ -160,222 +164,263 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) return; } - { - _Member &m = _members[fromMemberId]; - Mutex::Lock mlck(m.lock); + _Member &m = _members[fromMemberId]; - try { - while (ptr < dmsg.size()) { - const unsigned int mlen = dmsg.at(ptr); ptr += 2; - const unsigned int nextPtr = ptr + mlen; - if (nextPtr > dmsg.size()) - break; + try { + while (ptr < dmsg.size()) { + const unsigned int mlen = dmsg.at(ptr); ptr += 2; + const unsigned int nextPtr = ptr + mlen; + if (nextPtr > dmsg.size()) + break; - int mtype = -1; - try { - switch((StateMessageType)(mtype = (int)dmsg[ptr++])) { - default: - break; + int mtype = -1; + try { + switch((StateMessageType)(mtype = (int)dmsg[ptr++])) { + default: + break; - case STATE_MESSAGE_ALIVE: { - ptr += 7; // skip version stuff, not used yet - m.x = dmsg.at(ptr); ptr += 4; - m.y = dmsg.at(ptr); ptr += 4; - m.z = dmsg.at(ptr); ptr += 4; - ptr += 8; // skip local clock, not used - m.load = dmsg.at(ptr); ptr += 8; - m.peers = dmsg.at(ptr); ptr += 8; - ptr += 8; // skip flags, unused + case CLUSTER_MESSAGE_ALIVE: { + Mutex::Lock mlck(m.lock); + ptr += 7; // skip version stuff, not used yet + m.x = dmsg.at(ptr); ptr += 4; + m.y = dmsg.at(ptr); ptr += 4; + m.z = dmsg.at(ptr); ptr += 4; + ptr += 8; // skip local clock, not used + m.load = dmsg.at(ptr); ptr += 8; + m.peers = dmsg.at(ptr); ptr += 8; + ptr += 8; // skip flags, unused #ifdef ZT_TRACE - std::string addrs; -#endif - unsigned int physicalAddressCount = dmsg[ptr++]; - m.zeroTierPhysicalEndpoints.clear(); - for(unsigned int i=0;i 0) - addrs.push_back(','); - addrs.append(m.zeroTierPhysicalEndpoints.back().toString()); - } + std::string addrs; #endif + unsigned int physicalAddressCount = dmsg[ptr++]; + m.zeroTierPhysicalEndpoints.clear(); + for(unsigned int i=0;inode->now() - m.lastReceivedAliveAnnouncement) >= ZT_CLUSTER_TIMEOUT) { - TRACE("[%u] I'm alive! peers close to %d,%d,%d can be redirected to: %s",(unsigned int)fromMemberId,m.x,m.y,m.z,addrs.c_str()); + else { + if (addrs.length() > 0) + addrs.push_back(','); + addrs.append(m.zeroTierPhysicalEndpoints.back().toString()); } #endif - m.lastReceivedAliveAnnouncement = RR->node->now(); - } break; + } +#ifdef ZT_TRACE + if ((RR->node->now() - m.lastReceivedAliveAnnouncement) >= ZT_CLUSTER_TIMEOUT) { + TRACE("[%u] I'm alive! peers close to %d,%d,%d can be redirected to: %s",(unsigned int)fromMemberId,m.x,m.y,m.z,addrs.c_str()); + } +#endif + m.lastReceivedAliveAnnouncement = RR->node->now(); + } break; - case STATE_MESSAGE_HAVE_PEER: { - const Address zeroTierAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; - Mutex::Lock _l2(_peerAffinities_m); - _peerAffinities.set(zeroTierAddress,fromMemberId); - TRACE("[%u] has %s @ %s",(unsigned int)fromMemberId,id.address().toString().c_str(),physicalAddress.toString().c_str()); - } break; + case CLUSTER_MESSAGE_HAVE_PEER: { + Identity id; + ptr += id.deserialize(dmsg,ptr); + if (id) { + RR->topology->saveIdentity(id); - case STATE_MESSAGE_WANT_PEER: { - const Address zeroTierAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; - SharedPtr peer(RR->topology->getPeerNoCache(zeroTierAddress)); - if ((peer)&&(peer->hasActiveDirectPath(RR->node->now()))) { - char buf[ZT_ADDRESS_LENGTH]; - peer->address().copyTo(buf,ZT_ADDRESS_LENGTH); - Mutex::Lock _l2(_members[fromMemberId].lock); - _send(fromMemberId,STATE_MESSAGE_HAVE_PEER,buf,ZT_ADDRESS_LENGTH); - _flush(fromMemberId); + { + Mutex::Lock _l(_remotePeers_m); + _remotePeers[std::pair(id.address(),(unsigned int)fromMemberId)] = RR->node->now(); } - } break; - case STATE_MESSAGE_MULTICAST_LIKE: { - const uint64_t nwid = dmsg.at(ptr); ptr += 8; - const Address address(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; - const MAC mac(dmsg.field(ptr,6),6); ptr += 6; - const uint32_t adi = dmsg.at(ptr); ptr += 4; - RR->mc->add(RR->node->now(),nwid,MulticastGroup(mac,adi),address); - TRACE("[%u] %s likes %s/%.8x on %.16llx",(unsigned int)fromMemberId,address.toString().c_str(),mac.toString().c_str(),(unsigned int)adi,nwid); - } break; - - case STATE_MESSAGE_COM: { - /* not currently used so not decoded yet - CertificateOfMembership com; - ptr += com.deserialize(dmsg,ptr); - if (com) { - TRACE("[%u] COM for %s on %.16llu rev %llu",(unsigned int)fromMemberId,com.issuedTo().toString().c_str(),com.networkId(),com.revision()); - } - */ - } break; - - case STATE_MESSAGE_PROXY_UNITE: { - const Address localPeerAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; - const Address remotePeerAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; - const unsigned int numRemotePeerPaths = dmsg[ptr++]; - InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max - for(unsigned int i=0;inode->now(); - SharedPtr localPeer(RR->topology->getPeerNoCache(localPeerAddress)); - if ((localPeer)&&(numRemotePeerPaths > 0)) { - InetAddress bestLocalV4,bestLocalV6; - localPeer->getBestActiveAddresses(now,bestLocalV4,bestLocalV6); - - InetAddress bestRemoteV4,bestRemoteV6; - for(unsigned int i=0;i q[ZT_CLUSTER_MAX_QUEUE_PER_SENDER]; + unsigned int qc = 0; + { + Mutex::Lock _l(_sendViaClusterQueue_m); + std::pair< std::multimap::iterator,std::multimap::iterator > er(_sendViaClusterQueue.equal_range(id.address())); + for(std::multimap::iterator qi(er.first);qi!=er.second;) { + if (qc >= ZT_CLUSTER_MAX_QUEUE_PER_SENDER) // sanity check break; - switch(remotePeerPaths[i].ss_family) { - case AF_INET: - if (!bestRemoteV4) - bestRemoteV4 = remotePeerPaths[i]; - break; - case AF_INET6: - if (!bestRemoteV6) - bestRemoteV6 = remotePeerPaths[i]; - break; - } - } - - Packet rendezvousForLocal(localPeerAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS); - rendezvousForLocal.append((uint8_t)0); - remotePeerAddress.appendTo(rendezvousForLocal); - - Buffer<2048> rendezvousForRemote; - remotePeerAddress.appendTo(rendezvousForRemote); - rendezvousForRemote.append((uint8_t)Packet::VERB_RENDEZVOUS); - const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForRemote.size(); - rendezvousForRemote.addSize(2); // space for actual packet payload length - rendezvousForRemote.append((uint8_t)0); // flags == 0 - localPeerAddress.appendTo(rendezvousForRemote); - - bool haveMatch = false; - if ((bestLocalV6)&&(bestRemoteV6)) { - haveMatch = true; - - rendezvousForLocal.append((uint16_t)bestRemoteV6.port()); - rendezvousForLocal.append((uint8_t)16); - rendezvousForLocal.append(bestRemoteV6.rawIpData(),16); - - rendezvousForRemote.append((uint16_t)bestLocalV6.port()); - rendezvousForRemote.append((uint8_t)16); - rendezvousForRemote.append(bestLocalV6.rawIpData(),16); - rendezvousForRemote.setAt(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16)); - } else if ((bestLocalV4)&&(bestRemoteV4)) { - haveMatch = true; - - rendezvousForLocal.append((uint16_t)bestRemoteV4.port()); - rendezvousForLocal.append((uint8_t)4); - rendezvousForLocal.append(bestRemoteV4.rawIpData(),4); - - rendezvousForRemote.append((uint16_t)bestLocalV4.port()); - rendezvousForRemote.append((uint8_t)4); - rendezvousForRemote.append(bestLocalV4.rawIpData(),4); - rendezvousForRemote.setAt(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4)); - } - - if (haveMatch) { - _send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size()); - _flush(fromMemberId); - RR->sw->send(rendezvousForLocal,true,0); + q[qc++] = *qi; + _sendViaClusterQueue.erase(qi++); } } - } break; + for(unsigned int i=0;isendViaCluster(q[i].first,q[i].second->toPeerAddress,q[i].second->data,q[i].second->len,q[i].second->unite); + delete q[i].second; + } - case STATE_MESSAGE_PROXY_SEND: { - const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; - const Packet::Verb verb = (Packet::Verb)dmsg[ptr++]; - const unsigned int len = dmsg.at(ptr); ptr += 2; - Packet outp(rcpt,RR->identity.address(),verb); - outp.append(dmsg.field(ptr,len),len); ptr += len; - RR->sw->send(outp,true,0); - TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len); - } break; - } - } catch ( ... ) { - TRACE("invalid message of size %u type %d (inner decode), discarding",mlen,mtype); - // drop invalids + TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str()); + } + } break; + + case CLUSTER_MESSAGE_WANT_PEER: { + const Address zeroTierAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; + SharedPtr peer(RR->topology->getPeerNoCache(zeroTierAddress)); + if ( (peer) && (peer->hasActiveDirectPath(RR->node->now())) ) { + Buffer<1024> buf; + peer->identity().serialize(buf); + Mutex::Lock _l2(_members[fromMemberId].lock); + _send(fromMemberId,CLUSTER_MESSAGE_HAVE_PEER,buf.data(),buf.size()); + _flush(fromMemberId); // lookups are latency sensitive + } + } + } break; + + case CLUSTER_MESSAGE_REMOTE_PACKET: { + const unsigned int plen = dmsg.at(ptr); ptr += 2; + if (plen) { + Packet remotep(dmsg.field(ptr,plen),plen); ptr += plen; + TRACE("remote %s from %s via %u (%u bytes)",Packet::verbString(remotep.verb()),remotep.source().toString().c_str(),fromMemberId,plen); + switch(remotep.verb()) { + case Packet::VERB_WHOIS: _doREMOTE_WHOIS(fromMemberId,remotep); break; + case Packet::VERB_MULTICAST_GATHER: _doREMOTE_MULTICAST_GATHER(fromMemberId,remotep); break; + default: break; // ignore things we don't care about across cluster + } + } + } break; + + case CLUSTER_MESSAGE_PROXY_UNITE: { + const Address localPeerAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; + const Address remotePeerAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; + const unsigned int numRemotePeerPaths = dmsg[ptr++]; + InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max + for(unsigned int i=0;inode->now(); + SharedPtr localPeer(RR->topology->getPeerNoCache(localPeerAddress)); + if ((localPeer)&&(numRemotePeerPaths > 0)) { + InetAddress bestLocalV4,bestLocalV6; + localPeer->getBestActiveAddresses(now,bestLocalV4,bestLocalV6); + + InetAddress bestRemoteV4,bestRemoteV6; + for(unsigned int i=0;iidentity.address(),Packet::VERB_RENDEZVOUS); + rendezvousForLocal.append((uint8_t)0); + remotePeerAddress.appendTo(rendezvousForLocal); + + Buffer<2048> rendezvousForRemote; + remotePeerAddress.appendTo(rendezvousForRemote); + rendezvousForRemote.append((uint8_t)Packet::VERB_RENDEZVOUS); + const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForRemote.size(); + rendezvousForRemote.addSize(2); // space for actual packet payload length + rendezvousForRemote.append((uint8_t)0); // flags == 0 + localPeerAddress.appendTo(rendezvousForRemote); + + bool haveMatch = false; + if ((bestLocalV6)&&(bestRemoteV6)) { + haveMatch = true; + + rendezvousForLocal.append((uint16_t)bestRemoteV6.port()); + rendezvousForLocal.append((uint8_t)16); + rendezvousForLocal.append(bestRemoteV6.rawIpData(),16); + + rendezvousForRemote.append((uint16_t)bestLocalV6.port()); + rendezvousForRemote.append((uint8_t)16); + rendezvousForRemote.append(bestLocalV6.rawIpData(),16); + rendezvousForRemote.setAt(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16)); + } else if ((bestLocalV4)&&(bestRemoteV4)) { + haveMatch = true; + + rendezvousForLocal.append((uint16_t)bestRemoteV4.port()); + rendezvousForLocal.append((uint8_t)4); + rendezvousForLocal.append(bestRemoteV4.rawIpData(),4); + + rendezvousForRemote.append((uint16_t)bestLocalV4.port()); + rendezvousForRemote.append((uint8_t)4); + rendezvousForRemote.append(bestLocalV4.rawIpData(),4); + rendezvousForRemote.setAt(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4)); + } + + if (haveMatch) { + _send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size()); + _flush(fromMemberId); + RR->sw->send(rendezvousForLocal,true,0); + } + } + } break; + + case CLUSTER_MESSAGE_PROXY_SEND: { + const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; + const Packet::Verb verb = (Packet::Verb)dmsg[ptr++]; + const unsigned int len = dmsg.at(ptr); ptr += 2; + Packet outp(rcpt,RR->identity.address(),verb); + outp.append(dmsg.field(ptr,len),len); ptr += len; + RR->sw->send(outp,true,0); + //TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len); + } break; } - - ptr = nextPtr; + } catch ( ... ) { + TRACE("invalid message of size %u type %d (inner decode), discarding",mlen,mtype); + // drop invalids } - } catch ( ... ) { - TRACE("invalid message (outer loop), discarding"); - // drop invalids + + ptr = nextPtr; } + } catch ( ... ) { + TRACE("invalid message (outer loop), discarding"); + // drop invalids } } bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite) { - if (len > 16384) // sanity check + if (len > ZT_PROTO_MAX_PACKET_LENGTH) // sanity check return false; const uint64_t now = RR->node->now(); unsigned int canHasPeer = 0; + uint64_t mostRecentTs = 0; + unsigned int mostRecentMemberId = 0xffffffff; { - Mutex::Lock _l2(_peerAffinities_m); - const unsigned int *pa = _peerAffinities.get(toPeerAddress); - if (!pa) { - char buf[ZT_ADDRESS_LENGTH]; - peerId.address().copyTo(buf,ZT_ADDRESS_LENGTH); - { - Mutex::Lock _l(_memberIds_m); - for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { - Mutex::Lock _l2(_members[*mid].lock); - _send(*mid,STATE_MESSAGE_WANT_PEER,buf,ZT_ADDRESS_LENGTH); - } + Mutex::Lock _l2(_remotePeers_m); + std::map< std::pair,uint64_t >::const_iterator rpe(_remotePeers.lower_bound(std::pair(fromPeerAddress,0))); + for(;;) { + if ((rpe == _remotePeers.end())||(rpe->first.first != fromPeerAddress)) + break; + else if (rpe->second > mostRecentTs) { + mostRecentTs = rpe->second; + mostRecentMemberId = rpe->first.second; } - return false; } - canHasPeer = *pa; + } + + const uint64_t age = now - mostRecentTs; + if (age >= (ZT_PEER_ACTIVITY_TIMEOUT / 3)) { + // Poll everyone with WANT_PEER if the age of our most recent entry is + // approaching expiration (or has expired, or does not exist). + char tmp[ZT_ADDRESS_LENGTH]; + toPeerAddress.copyTo(tmp,ZT_ADDRESS_LENGTH); + { + Mutex::Lock _l(_memberIds_m); + for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { + Mutex::Lock _l2(_members[*mid].lock); + _send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH); + if (mostRecentMemberId > 0xffff) + _flush(*mid); // latency sensitive if we don't have one + } + } + + // If there isn't a good place to send via, then enqueue this for retrying + // later and return after having broadcasted a WANT_PEER. + if ((age >= ZT_PEER_ACTIVITY_TIMEOUT)||(mostRecentMemberId > 0xffff)) { + Mutex::Lock _l(_sendViaClusterQueue_m); + if (_sendViaClusterQueue.count(fromPeerAddress) < ZT_CLUSTER_MAX_QUEUE_PER_SENDER) + _sendViaClusterQueue.insert(std::pair(fromPeerAddress,new _SQE(now,toPeerAddress,data,len,unite))); + return true; + } } Buffer<1024> buf; @@ -401,11 +446,14 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee v6.serialize(buf); } } + { - Mutex::Lock _l2(_members[canHasPeer].lock); - if (buf.size() > 0) - _send(canHasPeer,STATE_MESSAGE_PROXY_UNITE,buf.data(),buf.size()); - if (_members[canHasPeer].zeroTierPhysicalEndpoints.size() > 0) + Mutex::Lock _l2(_members[mostRecentMemberId].lock); + if (buf.size() > 0) { + _send(canHasPeer,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size()); + _flush(canHasPeer); // latency sensitive + } + if (_members[mostRecentMemberId].zeroTierPhysicalEndpoints.size() > 0) RR->node->putPacket(InetAddress(),_members[canHasPeer].zeroTierPhysicalEndpoints.front(),data,len); } @@ -414,62 +462,43 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee return true; } -void Cluster::replicateHavePeer(const Identity &peerId) +void Cluster::sendDistributedQuery(const Packet &pkt) { - char buf[ZT_ADDRESS_LENGTH]; - peerId.address().copyTo(buf,ZT_ADDRESS_LENGTH); - { - Mutex::Lock _l(_memberIds_m); - for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { - Mutex::Lock _l2(_members[*mid].lock); - _send(*mid,STATE_MESSAGE_HAVE_PEER,buf,ZT_ADDRESS_LENGTH); - } - } -} - -void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,const MulticastGroup &group) -{ - Buffer<1024> buf; - buf.append((uint64_t)nwid); - peerAddress.appendTo(buf); - group.mac().appendTo(buf); - buf.append((uint32_t)group.adi()); - TRACE("replicating %s MULTICAST_LIKE %.16llx/%s/%u to all members",peerAddress.toString().c_str(),nwid,group.mac().toString().c_str(),(unsigned int)group.adi()); - { - Mutex::Lock _l(_memberIds_m); - for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { - Mutex::Lock _l2(_members[*mid].lock); - _send(*mid,STATE_MESSAGE_MULTICAST_LIKE,buf.data(),buf.size()); - } - } -} - -void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembership &com) -{ - /* not used yet, so don't do this yet Buffer<4096> buf; - com.serialize(buf); - TRACE("replicating %s COM for %.16llx to all members",com.issuedTo().toString().c_str(),com.networkId()); - { - Mutex::Lock _l(_memberIds_m); - for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { - Mutex::Lock _l2(_members[*mid].lock); - _send(*mid,STATE_MESSAGE_COM,buf.data(),buf.size()); - } + buf.append((uint16_t)pkt.size()); + buf.append(pkt.data(),pkt.size()); + Mutex::Lock _l(_memberIds_m); + for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { + Mutex::Lock _l2(_members[*mid].lock); + _send(*mid,CLUSTER_MESSAGE_REMOTE_PACKET,buf.data(),buf.size()); + _flush(*mid); // these tend to be latency-sensitive } - */ } void Cluster::doPeriodicTasks() { const uint64_t now = RR->node->now(); + if ((now - _lastFlushed) >= ZT_CLUSTER_FLUSH_PERIOD) { _lastFlushed = now; + + { + Mutex::Lock _l2(_sendViaClusterQueue_m); + for(std::multimap::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) { + if ((now - qi->second->timestamp) >= ZT_CLUSTER_QUEUE_EXPIRATION) { + delete qi->second; + _sendViaClusterQueue.erase(qi++); + } else ++qi; + } + } + Mutex::Lock _l(_memberIds_m); for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { Mutex::Lock _l2(_members[*mid].lock); if ((now - _members[*mid].lastAnnouncedAliveTo) >= ((ZT_CLUSTER_TIMEOUT / 2) - 1000)) { + _members[*mid].lastAnnouncedAliveTo = now; + Buffer<2048> alive; alive.append((uint16_t)ZEROTIER_ONE_VERSION_MAJOR); alive.append((uint16_t)ZEROTIER_ONE_VERSION_MINOR); @@ -491,13 +520,23 @@ void Cluster::doPeriodicTasks() alive.append((uint8_t)_zeroTierPhysicalEndpoints.size()); for(std::vector::const_iterator pe(_zeroTierPhysicalEndpoints.begin());pe!=_zeroTierPhysicalEndpoints.end();++pe) pe->serialize(alive); - _send(*mid,STATE_MESSAGE_ALIVE,alive.data(),alive.size()); - _members[*mid].lastAnnouncedAliveTo = now; + _send(*mid,CLUSTER_MESSAGE_ALIVE,alive.data(),alive.size()); } _flush(*mid); // does nothing if nothing to flush } } + + if ((now - _lastCleanedRemotePeers) >= (ZT_PEER_ACTIVITY_TIMEOUT * 2)) { + _lastCleanedRemotePeers = now; + + Mutex::Lock _l(_remotePeers_m); + for(std::map< std::pair,uint64_t >::iterator rp(_remotePeers.begin());rp!=_remotePeers.end();) { + if ((now - rp->second) >= ZT_PEER_ACTIVITY_TIMEOUT) + _remotePeers.erase(rp++); + else ++rp; + } + } } void Cluster::addMember(uint16_t memberId) diff --git a/node/Cluster.hpp b/node/Cluster.hpp index c5d110d04..957e84406 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -32,6 +32,8 @@ #include #include +#include +#include #include "Constants.hpp" #include "../include/ZeroTierOne.h" @@ -62,6 +64,16 @@ */ #define ZT_CLUSTER_FLUSH_PERIOD 500 +/** + * Maximum number of queued outgoing packets per sender address + */ +#define ZT_CLUSTER_MAX_QUEUE_PER_SENDER 8 + +/** + * Expiration time for send queue entries + */ +#define ZT_CLUSTER_QUEUE_EXPIRATION 2500 + namespace ZeroTier { class RuntimeEnvironment; @@ -98,7 +110,7 @@ public: */ enum StateMessageType { - STATE_MESSAGE_NOP = 0, + CLUSTER_MESSAGE_NOP = 0, /** * This cluster member is alive: @@ -115,35 +127,46 @@ public: * <[8] flags (currently unused, must be zero)> * <[1] number of preferred ZeroTier endpoints> * <[...] InetAddress(es) of preferred ZeroTier endpoint(s)> + * + * Cluster members constantly broadcast an alive heartbeat and will only + * receive peer redirects if they've done so within the timeout. */ - STATE_MESSAGE_ALIVE = 1, + CLUSTER_MESSAGE_ALIVE = 1, /** * Cluster member has this peer: - * <[5] ZeroTier address of peer> + * <[...] serialized identity of peer> + * + * This is typically sent in response to WANT_PEER but can also be pushed + * to prepopulate if this makes sense. */ - STATE_MESSAGE_HAVE_PEER = 2, + CLUSTER_MESSAGE_HAVE_PEER = 2, /** * Cluster member wants this peer: * <[5] ZeroTier address of peer> + * + * Members that have a direct link to this peer will respond with + * HAVE_PEER. */ - STATE_MESSAGE_WANT_PEER = 3, + CLUSTER_MESSAGE_WANT_PEER = 3, /** - * Peer subscription to multicast group: - * <[8] network ID> - * <[5] peer ZeroTier address> - * <[6] MAC address of multicast group> - * <[4] 32-bit multicast group ADI> + * A remote packet that we should also possibly respond to: + * <[2] 16-bit length of remote packet> + * <[...] remote packet payload> + * + * Cluster members may relay requests by relaying the request packet. + * These may include requests such as WHOIS and MULTICAST_GATHER. The + * packet must be already decrypted, decompressed, and authenticated. + * + * This can only be used for small request packets as per the cluster + * message size limit, but since these are the only ones in question + * this is fine. + * + * If a response is generated it is sent via PROXY_SEND. */ - STATE_MESSAGE_MULTICAST_LIKE = 4, - - /** - * Certificate of network membership for a peer: - * <[...] serialized COM> - */ - STATE_MESSAGE_COM = 5, + CLUSTER_MESSAGE_REMOTE_PACKET = 4, /** * Request that VERB_RENDEZVOUS be sent to a peer that we have: @@ -157,7 +180,7 @@ public: * info for its peer, and we send VERB_RENDEZVOUS to both sides: to ours * directly and with PROXY_SEND to theirs. */ - STATE_MESSAGE_PROXY_UNITE = 6, + CLUSTER_MESSAGE_PROXY_UNITE = 5, /** * Request that a cluster member send a packet to a locally-known peer: @@ -173,7 +196,7 @@ public: * while PROXY_SEND is used to implement proxy sending (which right * now is only used to send RENDEZVOUS). */ - STATE_MESSAGE_PROXY_SEND = 7, + CLUSTER_MESSAGE_PROXY_SEND = 6, /** * Replicate a network config for a network we belong to: @@ -186,7 +209,7 @@ public: * * TODO: not implemented yet! */ - STATE_MESSAGE_NETWORK_CONFIG = 8 + CLUSTER_MESSAGE_NETWORK_CONFIG = 7 }; /** @@ -222,37 +245,28 @@ public: /** * Send this packet via another node in this cluster if another node has this peer * + * This is used in the outgoing packet and relaying logic in Switch to + * relay packets to other cluster members. It isn't PROXY_SEND-- that is + * used internally in Cluster to send responses to peer queries. + * * @param fromPeerAddress Source peer address (if known, should be NULL for fragments) * @param toPeerAddress Destination peer address * @param data Packet or packet fragment data * @param len Length of packet or fragment * @param unite If true, also request proxy unite across cluster - * @return True if this data was sent via another cluster member, false if none have this peer */ - bool sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite); + void sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite); /** - * Advertise to the cluster that we have this peer + * Send a distributed query to other cluster members * - * @param peerId Identity of peer that we have - */ - void replicateHavePeer(const Identity &peerId); - - /** - * Advertise a multicast LIKE to the cluster + * Some queries such as WHOIS or MULTICAST_GATHER need a response from other + * cluster members. Replies (if any) will be sent back to the peer via + * PROXY_SEND across the cluster. * - * @param nwid Network ID - * @param peerAddress Peer address that sent LIKE - * @param group Multicast group + * @param pkt Packet to distribute */ - void replicateMulticastLike(uint64_t nwid,const Address &peerAddress,const MulticastGroup &group); - - /** - * Advertise a network COM to the cluster - * - * @param com Certificate of network membership (contains peer and network ID) - */ - void replicateCertificateOfNetworkMembership(const CertificateOfMembership &com); + void sendDistributedQuery(const Packet &pkt); /** * Call every ~ZT_CLUSTER_PERIODIC_TASK_PERIOD milliseconds. @@ -295,6 +309,9 @@ private: void _send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len); void _flush(uint16_t memberId); + void _doREMOTE_WHOIS(uint64_t fromMemberId,const Packet &remotep); + void _doREMOTE_MULTICAST_GATHER(uint64_t fromMemberId,const Packet &remotep); + // These are initialized in the constructor and remain immutable uint16_t _masterSecret[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)]; unsigned char _key[ZT_PEER_SECRET_KEY_LENGTH]; @@ -348,10 +365,28 @@ private: std::vector _memberIds; Mutex _memberIds_m; - Hashtable< Address,unsigned int > _peerAffinities; - Mutex _peerAffinities_m; + std::map< std::pair,uint64_t > _remotePeers; // we need ordered behavior and lower_bound here + Mutex _remotePeers_m; + + struct _SQE + { + _SQE() : timestamp(0),len(0),unite(false) {} + _SQE(const uint64_t ts,const Address &t,const void *d,const unsigned int l,const bool u) : + timestamp(ts), + toPeerAddress(t), + len(l), + unite(u) { memcpy(data,d,l); } + uint64_t timestamp; + Address toPeerAddress; + unsigned int len; + bool unite; + unsigned char data[ZT_PROTO_MAX_PACKET_LENGTH]; + }; + std::multimap _sendViaClusterQueue; // queue by from peer address + Mutex _sendViaClusterQueue_m; uint64_t _lastFlushed; + uint64_t _lastCleanedRemotePeers; }; } // namespace ZeroTier diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp index 26339b003..d655b8561 100644 --- a/node/IncomingPacket.cpp +++ b/node/IncomingPacket.cpp @@ -133,10 +133,7 @@ bool IncomingPacket::_doERROR(const RuntimeEnvironment *RR,const SharedPtr switch(errorCode) { case Packet::ERROR_OBJ_NOT_FOUND: - if (inReVerb == Packet::VERB_WHOIS) { - if (RR->topology->isUpstream(peer->identity())) - RR->sw->cancelWhoisRequest(Address(field(ZT_PROTO_VERB_ERROR_IDX_PAYLOAD,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH)); - } else if (inReVerb == Packet::VERB_NETWORK_CONFIG_REQUEST) { + if (inReVerb == Packet::VERB_NETWORK_CONFIG_REQUEST) { SharedPtr network(RR->node->network(at(ZT_PROTO_VERB_ERROR_IDX_PAYLOAD))); if ((network)&&(network->controller() == peer->address())) network->setNotFound(); @@ -237,7 +234,7 @@ bool IncomingPacket::_doHELLO(const RuntimeEnvironment *RR,SharedPtr &peer return true; } - if (!peer) { + if (!peer) { // peer == NULL is the normal case here peer = RR->topology->getPeer(id.address()); if (peer) { // We already have an identity with this address -- check for collisions @@ -452,10 +449,6 @@ bool IncomingPacket::_doOK(const RuntimeEnvironment *RR,const SharedPtr &p CertificateOfMembership com; offset += com.deserialize(*this,ZT_PROTO_VERB_MULTICAST_FRAME__OK__IDX_COM_AND_GATHER_RESULTS); peer->validateAndSetNetworkMembershipCertificate(RR,nwid,com); -#ifdef ZT_ENABLE_CLUSTER - if (RR->cluster) - RR->cluster->replicateCertificateOfNetworkMembership(com); -#endif } if ((flags & 0x02) != 0) { @@ -491,14 +484,10 @@ bool IncomingPacket::_doWHOIS(const RuntimeEnvironment *RR,const SharedPtr RR->antiRec->logOutgoingZT(outp.data(),outp.size()); RR->node->putPacket(_localAddress,_remoteAddress,outp.data(),outp.size()); } else { - Packet outp(peer->address(),RR->identity.address(),Packet::VERB_ERROR); - outp.append((unsigned char)Packet::VERB_WHOIS); - outp.append(packetId()); - outp.append((unsigned char)Packet::ERROR_OBJ_NOT_FOUND); - outp.append(payload(),ZT_ADDRESS_LENGTH); - outp.armor(peer->key(),true); - RR->antiRec->logOutgoingZT(outp.data(),outp.size()); - RR->node->putPacket(_localAddress,_remoteAddress,outp.data(),outp.size()); +#ifdef ZT_ENABLE_CLUSTER + if (RR->cluster) + RR->cluster->sendDistributedQuery(*this); +#endif } } else { TRACE("dropped WHOIS from %s(%s): missing or invalid address",source().toString().c_str(),_remoteAddress.toString().c_str()); @@ -585,10 +574,6 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,const SharedPtr

validateAndSetNetworkMembershipCertificate(RR,network->id(),com); -#ifdef ZT_ENABLE_CLUSTER - if (RR->cluster) - RR->cluster->replicateCertificateOfNetworkMembership(com); -#endif } if (!network->isAllowed(peer)) { @@ -674,10 +659,6 @@ bool IncomingPacket::_doMULTICAST_LIKE(const RuntimeEnvironment *RR,const Shared const uint64_t nwid = at(ptr); const MulticastGroup group(MAC(field(ptr + 8,6),6),at(ptr + 14)); RR->mc->add(now,nwid,group,peer->address()); -#ifdef ZT_ENABLE_CLUSTER - if (RR->cluster) - RR->cluster->replicateMulticastLike(nwid,peer->address(),group); -#endif } peer->received(RR,_localAddress,_remoteAddress,hops(),packetId(),Packet::VERB_MULTICAST_LIKE,0,Packet::VERB_NOP); @@ -696,10 +677,6 @@ bool IncomingPacket::_doNETWORK_MEMBERSHIP_CERTIFICATE(const RuntimeEnvironment while (ptr < size()) { ptr += com.deserialize(*this,ptr); peer->validateAndSetNetworkMembershipCertificate(RR,com.networkId(),com); -#ifdef ZT_ENABLE_CLUSTER - if (RR->cluster) - RR->cluster->replicateCertificateOfNetworkMembership(com); -#endif } peer->received(RR,_localAddress,_remoteAddress,hops(),packetId(),Packet::VERB_NETWORK_MEMBERSHIP_CERTIFICATE,0,Packet::VERB_NOP); @@ -831,11 +808,17 @@ bool IncomingPacket::_doMULTICAST_GATHER(const RuntimeEnvironment *RR,const Shar outp.append(nwid); mg.mac().appendTo(outp); outp.append((uint32_t)mg.adi()); - if (RR->mc->gather(peer->address(),nwid,mg,outp,gatherLimit)) { + const unsigned int gatheredLocally = RR->mc->gather(peer->address(),nwid,mg,outp,gatherLimit); + if (gatheredLocally) { outp.armor(peer->key(),true); RR->antiRec->logOutgoingZT(outp.data(),outp.size()); RR->node->putPacket(_localAddress,_remoteAddress,outp.data(),outp.size()); } + +#ifdef ZT_ENABLE_CLUSTER + if ((RR->cluster)&&(gatheredLocally < gatherLimit)) + RR->cluster->sendDistributedQuery(*this); +#endif } peer->received(RR,_localAddress,_remoteAddress,hops(),packetId(),Packet::VERB_MULTICAST_GATHER,0,Packet::VERB_NOP); @@ -860,10 +843,6 @@ bool IncomingPacket::_doMULTICAST_FRAME(const RuntimeEnvironment *RR,const Share CertificateOfMembership com; offset += com.deserialize(*this,ZT_PROTO_VERB_MULTICAST_FRAME_IDX_COM); peer->validateAndSetNetworkMembershipCertificate(RR,nwid,com); -#ifdef ZT_ENABLE_CLUSTER - if (RR->cluster) - RR->cluster->replicateCertificateOfNetworkMembership(com); -#endif } // Check membership after we've read any included COM, since diff --git a/node/Packet.hpp b/node/Packet.hpp index 63c49ce33..ef0251e33 100644 --- a/node/Packet.hpp +++ b/node/Packet.hpp @@ -322,8 +322,6 @@ #define ZT_PROTO_VERB_WHOIS__OK__IDX_IDENTITY (ZT_PROTO_VERB_OK_IDX_PAYLOAD) -#define ZT_PROTO_VERB_WHOIS__ERROR__IDX_ZTADDRESS (ZT_PROTO_VERB_ERROR_IDX_PAYLOAD) - #define ZT_PROTO_VERB_NETWORK_CONFIG_REQUEST__OK__IDX_NETWORK_ID (ZT_PROTO_VERB_OK_IDX_PAYLOAD) #define ZT_PROTO_VERB_NETWORK_CONFIG_REQUEST__OK__IDX_DICT_LEN (ZT_PROTO_VERB_NETWORK_CONFIG_REQUEST__OK__IDX_NETWORK_ID + 8) #define ZT_PROTO_VERB_NETWORK_CONFIG_REQUEST__OK__IDX_DICT (ZT_PROTO_VERB_NETWORK_CONFIG_REQUEST__OK__IDX_DICT_LEN + 2) @@ -599,8 +597,11 @@ public: * OK response payload: * <[...] binary serialized identity> * - * ERROR response payload: - * <[5] address> + * If querying a cluster, duplicate OK responses may occasionally occur. + * These should be discarded. + * + * If the address is not found, no response is generated. WHOIS requests + * will time out much like ARP requests and similar do in L2. */ VERB_WHOIS = 4, @@ -785,6 +786,9 @@ public: * to send multicast but does not have the desired number of recipient * peers. * + * More than one OK response can occur if the response is broken up across + * multiple packets or if querying a clustered node. + * * OK response payload: * <[8] 64-bit network ID> * <[6] MAC address of multicast group being queried> @@ -794,16 +798,7 @@ public: * <[2] 16-bit number of members enumerated in this packet> * <[...] series of 5-byte ZeroTier addresses of enumerated members> * - * If no endpoints are known, OK and ERROR are both optional. It's okay - * to return nothing in that case since gathering is "lazy." - * - * ERROR response payload: - * <[8] 64-bit network ID> - * <[6] MAC address of multicast group being queried> - * <[4] 32-bit ADI for multicast group being queried> - * - * ERRORs are optional and are only generated if permission is denied, - * certificate of membership is out of date, etc. + * ERROR is not generated; queries that return no response are dropped. */ VERB_MULTICAST_GATHER = 13, @@ -1118,7 +1113,7 @@ public: /* Bad/unsupported protocol version */ ERROR_BAD_PROTOCOL_VERSION = 2, - /* Unknown object queried (e.g. with WHOIS) */ + /* Unknown object queried */ ERROR_OBJ_NOT_FOUND = 3, /* HELLO pushed an identity whose address is already claimed */ diff --git a/node/Peer.cpp b/node/Peer.cpp index 52727c78f..6f987da2f 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -156,14 +156,7 @@ void Peer::received( } } - if (pathIsConfirmed) { - -#ifdef ZT_ENABLE_CLUSTER - if ((RR->cluster)&&(verb == Packet::VERB_HELLO)) - RR->cluster->replicateHavePeer(_id); -#endif - - } else { + if (!pathIsConfirmed) { if (verb == Packet::VERB_OK) { Path *slot = (Path *)0; @@ -186,11 +179,6 @@ void Peer::received( _sortPaths(now); } -#ifdef ZT_ENABLE_CLUSTER - if (RR->cluster) - RR->cluster->replicateHavePeer(_id); -#endif - } else { /* If this path is not known, send a HELLO. We don't learn