From 33ad3deaee06df288731064aa0663cb2806ebea0 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Fri, 21 Mar 2014 13:46:55 -0700 Subject: [PATCH] Builds with new Path code. --- node/Node.cpp | 4 ++-- node/NodeConfig.cpp | 28 ++++++++++++-------------- node/PacketDecoder.cpp | 24 +++++++++++----------- node/Path.hpp | 8 ++++---- node/Peer.cpp | 2 +- node/Peer.hpp | 45 +++++++++++++++++++++++++++++++----------- node/SocketManager.cpp | 8 ++++---- node/Topology.cpp | 5 +++-- node/Topology.hpp | 32 +++++++++++++++++------------- 9 files changed, 91 insertions(+), 65 deletions(-) diff --git a/node/Node.cpp b/node/Node.cpp index a4f2c3afa..1ffc0e79b 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -581,7 +581,7 @@ Node::ReasonForTermination Node::run() std::vector< SharedPtr > sns(_r->topology->supernodePeers()); TRACE("pinging %d supernodes",(int)sns.size()); for(std::vector< SharedPtr >::const_iterator p(sns.begin());p!=sns.end();++p) - (*p)->sendPing(_r,now); + (*p)->sendPing(_r,now,resynchronize); } if (resynchronize) { @@ -618,7 +618,7 @@ Node::ReasonForTermination Node::run() if ((now - lastPingCheck) >= ZT_PING_CHECK_DELAY) { lastPingCheck = now; try { - _r->topology->eachPeer(Topology::PingPeersThatNeedPing(_r,now)); + _r->topology->eachPeer(Topology::PingPeersThatNeedPing(_r,now,resynchronize)); _r->topology->eachPeer(Topology::OpenPeersThatNeedFirewallOpener(_r,now)); } catch (std::exception &exc) { LOG("unexpected exception running ping check cycle: %s",exc.what()); diff --git a/node/NodeConfig.cpp b/node/NodeConfig.cpp index 57c655b79..6f1224983 100644 --- a/node/NodeConfig.cpp +++ b/node/NodeConfig.cpp @@ -156,20 +156,18 @@ public: } inline void operator()(Topology &t,const SharedPtr &p) { - InetAddress v4(p->ipv4ActivePath(now)); - InetAddress v6(p->ipv6ActivePath(now)); - if ((v4)||(v6)) { - ipcc->printf("200 listpeers %s %s %s %u %s"ZT_EOL_S, - p->address().toString().c_str(), - ((v4) ? v4.toString().c_str() : "-"), - ((v6) ? v6.toString().c_str() : "-"), - p->latency(), - p->remoteVersion().c_str()); - } else { - ipcc->printf("200 listpeers %s - - - %s"ZT_EOL_S, - p->address().toString().c_str(), - p->remoteVersion().c_str()); + std::vector pp(p->paths()); + std::string pathsStr; + for(std::vector::const_iterator ppp(pp.begin());ppp!=pp.end();++ppp) { + if (pathsStr.length()) + pathsStr.push_back(','); + pathsStr.append(ppp->toString()); } + ipcc->printf("200 listpeers %s %s %u %s"ZT_EOL_S, + p->address().toString().c_str(), + ((pathsStr.length() > 0) ? pathsStr.c_str() : "-"), + p->latency(), + p->remoteVersion().c_str()); } IpcConnection *ipcc; uint64_t now; @@ -216,7 +214,7 @@ void NodeConfig::_doCommand(IpcConnection *ipcc,const char *commandLine) std::vector< SharedPtr > snp(_r->topology->supernodePeers()); for(std::vector< SharedPtr >::const_iterator sn(snp.begin());sn!=snp.end();++sn) { uint64_t lastRec = (*sn)->lastDirectReceive(); - if ((lastRec)&&(lastRec > since)&&((now - lastRec) < ZT_PEER_LINK_ACTIVITY_TIMEOUT)) { + if ((lastRec)&&(lastRec > since)&&((now - lastRec) < ZT_PEER_PATH_ACTIVITY_TIMEOUT)) { isOnline = true; break; } @@ -224,7 +222,7 @@ void NodeConfig::_doCommand(IpcConnection *ipcc,const char *commandLine) ipcc->printf("200 info %s %s %s"ZT_EOL_S,_r->identity.address().toString().c_str(),(isOnline ? "ONLINE" : "OFFLINE"),Node::versionString()); } else if (cmd[0] == "listpeers") { - ipcc->printf("200 listpeers "ZT_EOL_S); + ipcc->printf("200 listpeers "ZT_EOL_S); _r->topology->eachPeer(_DumpPeerStatistics(ipcc)); } else if (cmd[0] == "listnetworks") { Mutex::Lock _l(_networks_m); diff --git a/node/PacketDecoder.cpp b/node/PacketDecoder.cpp index a1336de24..43ed8c696 100644 --- a/node/PacketDecoder.cpp +++ b/node/PacketDecoder.cpp @@ -82,7 +82,7 @@ bool PacketDecoder::tryDecode(const RuntimeEnvironment *_r) switch(verb()) { case Packet::VERB_NOP: - peer->onReceive(_r,_remoteAddress,hops(),packetId(),Packet::VERB_NOP,0,Packet::VERB_NOP,Utils::now()); + peer->onReceive(_r,_fromSock,_remoteAddress,hops(),packetId(),Packet::VERB_NOP,0,Packet::VERB_NOP,Utils::now()); return true; case Packet::VERB_HELLO: return _doHELLO(_r); // legal, but why? :) @@ -156,7 +156,7 @@ bool PacketDecoder::_doERROR(const RuntimeEnvironment *_r,const SharedPtr break; } - peer->onReceive(_r,_remoteAddress,hops(),packetId(),Packet::VERB_ERROR,inRePacketId,inReVerb,Utils::now()); + peer->onReceive(_r,_fromSock,_remoteAddress,hops(),packetId(),Packet::VERB_ERROR,inRePacketId,inReVerb,Utils::now()); } catch (std::exception &ex) { TRACE("dropped ERROR from %s(%s): unexpected exception: %s",source().toString().c_str(),_remoteAddress.toString().c_str(),ex.what()); } catch ( ... ) { @@ -246,7 +246,7 @@ bool PacketDecoder::_doHELLO(const RuntimeEnvironment *_r) peer = _r->topology->addPeer(newPeer); } - peer->onReceive(_r,_remoteAddress,hops(),packetId(),Packet::VERB_HELLO,0,Packet::VERB_NOP,Utils::now()); + peer->onReceive(_r,_fromSock,_remoteAddress,hops(),packetId(),Packet::VERB_HELLO,0,Packet::VERB_NOP,Utils::now()); peer->setRemoteVersion(vMajor,vMinor,vRevision); // If a supernode has a version higher than ours, this causes a software @@ -324,7 +324,7 @@ bool PacketDecoder::_doOK(const RuntimeEnvironment *_r,const SharedPtr &pe break; } - peer->onReceive(_r,_remoteAddress,hops(),packetId(),Packet::VERB_OK,inRePacketId,inReVerb,Utils::now()); + peer->onReceive(_r,_fromSock,_remoteAddress,hops(),packetId(),Packet::VERB_OK,inRePacketId,inReVerb,Utils::now()); } catch (std::exception &ex) { TRACE("dropped OK from %s(%s): unexpected exception: %s",source().toString().c_str(),_remoteAddress.toString().c_str(),ex.what()); } catch ( ... ) { @@ -358,7 +358,7 @@ bool PacketDecoder::_doWHOIS(const RuntimeEnvironment *_r,const SharedPtr } else { TRACE("dropped WHOIS from %s(%s): missing or invalid address",source().toString().c_str(),_remoteAddress.toString().c_str()); } - peer->onReceive(_r,_remoteAddress,hops(),packetId(),Packet::VERB_WHOIS,0,Packet::VERB_NOP,Utils::now()); + peer->onReceive(_r,_fromSock,_remoteAddress,hops(),packetId(),Packet::VERB_WHOIS,0,Packet::VERB_NOP,Utils::now()); return true; } @@ -387,7 +387,7 @@ bool PacketDecoder::_doRENDEZVOUS(const RuntimeEnvironment *_r,const SharedPtr

0)&&((addrlen == 4)||(addrlen == 16))) { InetAddress atAddr(field(ZT_PROTO_VERB_RENDEZVOUS_IDX_ADDRESS,addrlen),addrlen,port); TRACE("RENDEZVOUS from %s says %s might be at %s, starting NAT-t",source().toString().c_str(),with.toString().c_str(),atAddr.toString().c_str()); - peer->onReceive(_r,_remoteAddress,hops(),packetId(),Packet::VERB_RENDEZVOUS,0,Packet::VERB_NOP,Utils::now()); + peer->onReceive(_r,_fromSock,_remoteAddress,hops(),packetId(),Packet::VERB_RENDEZVOUS,0,Packet::VERB_NOP,Utils::now()); _r->sw->contact(withPeer,atAddr); } else { TRACE("dropped corrupt RENDEZVOUS from %s(%s) (bad address or port)",source().toString().c_str(),_remoteAddress.toString().c_str()); @@ -426,7 +426,7 @@ bool PacketDecoder::_doFRAME(const RuntimeEnvironment *_r,const SharedPtr // we receive unicast frames from it. This is called "implicit social // ordering" in other docs. _r->mc->bringCloser(network->id(),source()); - peer->onReceive(_r,_remoteAddress,hops(),packetId(),Packet::VERB_FRAME,0,Packet::VERB_NOP,Utils::now()); + peer->onReceive(_r,_fromSock,_remoteAddress,hops(),packetId(),Packet::VERB_FRAME,0,Packet::VERB_NOP,Utils::now()); } else { TRACE("dropped FRAME from %s(%s): sender not a member of closed network %.16llx",source().toString().c_str(),_remoteAddress.toString().c_str(),network->id()); @@ -583,7 +583,7 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared #endif // At this point the frame is basically valid, so we can call it a receive - peer->onReceive(_r,_remoteAddress,hops(),packetId(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,Utils::now()); + peer->onReceive(_r,_fromSock,_remoteAddress,hops(),packetId(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,Utils::now()); // This gets updated later in most cases but start with the global limit. unsigned int maxDepth = ZT_MULTICAST_GLOBAL_MAX_DEPTH; @@ -834,7 +834,7 @@ bool PacketDecoder::_doMULTICAST_LIKE(const RuntimeEnvironment *_r,const SharedP } } - peer->onReceive(_r,_remoteAddress,hops(),packetId(),Packet::VERB_MULTICAST_LIKE,0,Packet::VERB_NOP,now); + peer->onReceive(_r,_fromSock,_remoteAddress,hops(),packetId(),Packet::VERB_MULTICAST_LIKE,0,Packet::VERB_NOP,now); } catch (std::exception &ex) { TRACE("dropped MULTICAST_LIKE from %s(%s): unexpected exception: %s",source().toString().c_str(),_remoteAddress.toString().c_str(),ex.what()); } catch ( ... ) { @@ -870,7 +870,7 @@ bool PacketDecoder::_doNETWORK_MEMBERSHIP_CERTIFICATE(const RuntimeEnvironment * } } - peer->onReceive(_r,_remoteAddress,hops(),packetId(),Packet::VERB_NETWORK_MEMBERSHIP_CERTIFICATE,0,Packet::VERB_NOP,Utils::now()); + peer->onReceive(_r,_fromSock,_remoteAddress,hops(),packetId(),Packet::VERB_NETWORK_MEMBERSHIP_CERTIFICATE,0,Packet::VERB_NOP,Utils::now()); } catch (std::exception &ex) { TRACE("dropped NETWORK_MEMBERSHIP_CERTIFICATE from %s(%s): unexpected exception: %s",source().toString().c_str(),_remoteAddress.toString().c_str(),ex.what()); } catch ( ... ) { @@ -913,7 +913,7 @@ bool PacketDecoder::_doNETWORK_CONFIG_REQUEST(const RuntimeEnvironment *_r,const #ifndef __WINDOWS__ } #endif // !__WINDOWS__ - peer->onReceive(_r,_remoteAddress,hops(),packetId(),Packet::VERB_NETWORK_CONFIG_REQUEST,0,Packet::VERB_NOP,Utils::now()); + peer->onReceive(_r,_fromSock,_remoteAddress,hops(),packetId(),Packet::VERB_NETWORK_CONFIG_REQUEST,0,Packet::VERB_NOP,Utils::now()); } catch (std::exception &exc) { TRACE("dropped NETWORK_CONFIG_REQUEST from %s(%s): unexpected exception: %s",source().toString().c_str(),_remoteAddress.toString().c_str(),exc.what()); } catch ( ... ) { @@ -934,7 +934,7 @@ bool PacketDecoder::_doNETWORK_CONFIG_REFRESH(const RuntimeEnvironment *_r,const nw->requestConfiguration(); } } - peer->onReceive(_r,_remoteAddress,hops(),packetId(),Packet::VERB_NETWORK_CONFIG_REFRESH,0,Packet::VERB_NOP,Utils::now()); + peer->onReceive(_r,_fromSock,_remoteAddress,hops(),packetId(),Packet::VERB_NETWORK_CONFIG_REFRESH,0,Packet::VERB_NOP,Utils::now()); } catch (std::exception &exc) { TRACE("dropped NETWORK_CONFIG_REFRESH from %s(%s): unexpected exception: %s",source().toString().c_str(),_remoteAddress.toString().c_str(),exc.what()); } catch ( ... ) { diff --git a/node/Path.hpp b/node/Path.hpp index 21ef71bb9..960b08b35 100644 --- a/node/Path.hpp +++ b/node/Path.hpp @@ -111,12 +111,12 @@ public: Utils::snprintf(lrago,sizeof(lrago),"%lld",(long long)((_lastReceived != 0) ? (now - _lastReceived) : -1)); Utils::snprintf(lfoago,sizeof(lfoago),"%lld",(long long)((_lastFirewallOpener != 0) ? (now - _lastFirewallOpener) : -1)); Utils::snprintf(lpago,sizeof(lfoago),"%lld",(long long)((_lastPing != 0) ? (now - _lastPing) : -1)); - return (std::string(_tcp ? "tcp:" : "udp:") + _addr.toString() + "[" + lsago + "," lrago + "," + lpago + "," + lfoago + "," + (active(now) ? "active" : "inactive") + "," + (_fixed ? "fixed" : "learned") + "]"); + return (_addr.toString() + "[" + (_tcp ? "tcp" : "udp") + ";" + lsago + ";" + lrago + ";" + lpago + ";" + lfoago + ";" + (active(now) ? "active" : "inactive") + ";" + (_fixed ? "fixed" : "learned") + "]"); } - inline operator==(const Path &p) const throw() { return ((_addr == p._addr)&&(_tcp == p._tcp)); } - inline operator!=(const Path &p) const throw() { return ((_addr != p._addr)||(_tcp != p._tcp)); } - inline operator<(const Path &p) const + inline bool operator==(const Path &p) const throw() { return ((_addr == p._addr)&&(_tcp == p._tcp)); } + inline bool operator!=(const Path &p) const throw() { return ((_addr != p._addr)||(_tcp != p._tcp)); } + inline bool operator<(const Path &p) const throw() { if (_addr == p._addr) { diff --git a/node/Peer.cpp b/node/Peer.cpp index ce2038901..3d4b34840 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -112,7 +112,7 @@ bool Peer::send(const RuntimeEnvironment *_r,const void *data,unsigned int len,u uint64_t bestPathLastReceived = 0; std::vector::iterator bestPath; for(std::vector::iterator p(_paths.begin());p!=_paths.end();++p) { - uint64_t lr = p->lastRecevied(); + uint64_t lr = p->lastReceived(); if (lr >= bestPathLastReceived) { bestPathLastReceived = lr; bestPath = p; diff --git a/node/Peer.hpp b/node/Peer.hpp index 2e19ba149..d9405b948 100644 --- a/node/Peer.hpp +++ b/node/Peer.hpp @@ -199,7 +199,7 @@ public: uint64_t x = 0; Mutex::Lock _l(_lock); for(std::vector::const_iterator p(_paths.begin());p!=_paths.end();++p) { - uint64_t l = p->lastReceive(); + uint64_t l = p->lastReceived(); if (l > x) x = l; } @@ -316,7 +316,7 @@ public: inline void addPath(const Path &newp) { Mutex::Lock _l(_lock); - for(std::vector::const_iterator p(_paths.begin());p!=_paths.end();++p) { + for(std::vector::iterator p(_paths.begin());p!=_paths.end();++p) { if (*p == newp) { p->setFixed(newp.fixed()); return; @@ -380,25 +380,48 @@ public: */ inline operator bool() const throw() { return (_id); } + /** + * @param now Current time + * @param v4 Result parameter to receive active IPv4 address, if any + * @param v6 Result parameter to receive active IPv6 address, if any + */ + inline void getActiveUdpPathAddresses(uint64_t now,InetAddress &v4,InetAddress &v6) const + { + bool gotV4 = false,gotV6 = false; + Mutex::Lock _l(_lock); + for(std::vector::const_iterator p(_paths.begin());p!=_paths.end();++p) { + if (!gotV4) { + if ((!p->tcp())&&(p->address().isV4())&&(p->active(now))) { + gotV4 = true; + v4 = p->address(); + } + } else if (!gotV6) { + if ((!p->tcp())&&(p->address().isV6())&&(p->active(now))) { + gotV6 = true; + v6 = p->address(); + } + } else break; + } + } + /** * Find a common set of addresses by which two peers can link, if any * * @param a Peer A * @param b Peer B * @param now Current time - * @return Pair: B's address to send to A, A's address to send to B + * @return Pair: B's address (to send to A), A's address (to send to B) */ static inline std::pair findCommonGround(const Peer &a,const Peer &b,uint64_t now) throw() { - if ((a._ipv6p.isActive(now))&&(b._ipv6p.isActive(now))) - return std::pair(b._ipv6p.addr,a._ipv6p.addr); - else if ((a._ipv4p.isActive(now))&&(b._ipv4p.isActive(now))) - return std::pair(b._ipv4p.addr,a._ipv4p.addr); - else if ((a._ipv6p.addr)&&(b._ipv6p.addr)) - return std::pair(b._ipv6p.addr,a._ipv6p.addr); - else if ((a._ipv4p.addr)&&(b._ipv4p.addr)) - return std::pair(b._ipv4p.addr,a._ipv4p.addr); + std::pair v4,v6; + b.getActiveUdpPathAddresses(now,v4.first,v6.first); + a.getActiveUdpPathAddresses(now,v4.second,v6.second); + if ((v6.first)&&(v6.second)) + return v6; + if ((v4.first)&&(v4.second)) + return v4; return std::pair(); } diff --git a/node/SocketManager.cpp b/node/SocketManager.cpp index a977a5461..dc3e1a04d 100644 --- a/node/SocketManager.cpp +++ b/node/SocketManager.cpp @@ -577,14 +577,14 @@ void SocketManager::whack() _whackSendPipe_m.unlock(); } -void closeTcpSockets() +void SocketManager::closeTcpSockets() { { Mutex::Lock _l2(_tcpSockets_m); _fdSetLock.lock(); - for(std::map< InetAddress,SharedPtr >::iterator s(_tcpSockets.begin());s!=_tcpSockets.end();++s`) { - FD_CLR((*s)->_sock,&_readfds); - FD_CLR((*s)->_sock,&_writefds); + for(std::map< InetAddress,SharedPtr >::iterator s(_tcpSockets.begin());s!=_tcpSockets.end();++s) { + FD_CLR(s->second->_sock,&_readfds); + FD_CLR(s->second->_sock,&_writefds); } _fdSetLock.unlock(); _tcpSockets.clear(); diff --git a/node/Topology.cpp b/node/Topology.cpp index b4bbfadb3..d36676547 100644 --- a/node/Topology.cpp +++ b/node/Topology.cpp @@ -64,8 +64,9 @@ void Topology::setSupernodes(const std::map< Identity,std::vector > SharedPtr p(getPeer(i->first.address())); if (!p) p = addPeer(SharedPtr(new Peer(_r->identity,i->first))); - for(std::vector::const_iterator j(i->second.begin());j!=i->second.end();++j) - p->setPathAddress(*j,true); + for(std::vector::const_iterator j(i->second.begin());j!=i->second.end();++j) { + p->addPath(Path(*j,false,true)); + } p->setLastUsed(now); _supernodePeers.push_back(p); } diff --git a/node/Topology.hpp b/node/Topology.hpp index e575cb8dd..0b72197dd 100644 --- a/node/Topology.hpp +++ b/node/Topology.hpp @@ -216,31 +216,35 @@ public: class PingPeersThatNeedPing { public: - PingPeersThatNeedPing(const RuntimeEnvironment *renv,uint64_t now) throw() : + PingPeersThatNeedPing(const RuntimeEnvironment *renv,uint64_t now,bool firstSinceReset) throw() : _now(now), _supernodeAddresses(renv->topology->supernodeAddresses()), - _r(renv) {} + _r(renv), + _firstSinceReset(firstSinceReset) {} inline void operator()(Topology &t,const SharedPtr &p) { if ( - ((_now - p->lastDirectReceive()) >= ZT_PEER_DIRECT_PING_DELAY) && - ( - ( - (p->hasDirectPath())&& - ((_now - p->lastFrame()) < ZT_PEER_LINK_ACTIVITY_TIMEOUT) - ) && - (!_supernodeAddresses.count(p->address())) - ) - ) { - p->sendPing(_r,_now); - } + /* 1: we have not heard anything directly in ZT_PEER_DIRECT_PING_DELAY ms */ + ((_now - p->lastDirectReceive()) >= ZT_PEER_DIRECT_PING_DELAY) && + /* 2: */ + ( + /* 2a: peer has direct path, and has sent us something recently */ + ( + (p->hasDirectPath())&& + ((_now - p->lastFrame()) < ZT_PEER_PATH_ACTIVITY_TIMEOUT) + ) && + /* 2b: peer is not a supernode */ + (!_supernodeAddresses.count(p->address())) + ) + ) { p->sendPing(_r,_now,_firstSinceReset); } } private: uint64_t _now; std::set

_supernodeAddresses; const RuntimeEnvironment *_r; + bool _firstSinceReset; }; /** @@ -261,7 +265,7 @@ public: { if (!_supernodeAddresses.count(p->address())) { p->clearPaths(false); // false means don't forget 'fixed' paths e.g. supernodes - if (((_now - p->lastFrame()) < ZT_PEER_LINK_ACTIVITY_TIMEOUT)&&(_supernode)) { + if (((_now - p->lastFrame()) < ZT_PEER_PATH_ACTIVITY_TIMEOUT)&&(_supernode)) { TRACE("sending reset NOP to %s",p->address().toString().c_str()); Packet outp(p->address(),_r->identity.address(),Packet::VERB_NOP); outp.armor(p->key(),false); // no need to encrypt a NOP