More tweaks to algorithm for determining when to fail over to TCP, and stop supernodes from resynchronizing unless explicitly ordered.

This commit is contained in:
Adam Ierymenko 2014-04-01 18:39:10 -07:00
parent 0e1fc06a6f
commit 700a450806
6 changed files with 71 additions and 44 deletions

View File

@ -305,9 +305,14 @@ error_no_byte_order_defined;
#define ZT_TCP_TUNNEL_ACTIVITY_TIMEOUT ZT_PEER_PATH_ACTIVITY_TIMEOUT #define ZT_TCP_TUNNEL_ACTIVITY_TIMEOUT ZT_PEER_PATH_ACTIVITY_TIMEOUT
/** /**
* Try TCP tunnels if no response to UDP PINGs in this many milliseconds * Try TCP tunnels if nothing received for this long
*/ */
#define ZT_PING_UNANSWERED_AFTER 5000 #define ZT_TCP_TUNNEL_FAILOVER_TIMEOUT 5000
/**
* Try to ping supernodes this often until we get something from somewhere
*/
#define ZT_STARTUP_AGGRO 5000
/** /**
* Stop relaying via peers that have not responded to direct sends in this long * Stop relaying via peers that have not responded to direct sends in this long

View File

@ -540,7 +540,7 @@ Node::ReasonForTermination Node::run()
long lastDelayDelta = 0; long lastDelayDelta = 0;
uint64_t networkConfigurationFingerprint = 0; uint64_t networkConfigurationFingerprint = 0;
_r->timeOfLastResynchronize = 0; _r->timeOfLastResynchronize = Utils::now();
while (impl->reasonForTermination == NODE_RUNNING) { while (impl->reasonForTermination == NODE_RUNNING) {
if (Utils::fileExists(shutdownIfUnreadablePath.c_str(),false)) { if (Utils::fileExists(shutdownIfUnreadablePath.c_str(),false)) {
@ -551,13 +551,7 @@ Node::ReasonForTermination Node::run()
} }
uint64_t now = Utils::now(); uint64_t now = Utils::now();
bool resynchronize = false;
// Did the user send SIGHUP or otherwise order network resync? (mostly for debugging)
bool resynchronize = impl->resynchronize;
impl->resynchronize = false;
if (resynchronize) {
LOG("manual resynchronize ordered, resyncing with network");
}
// If it looks like the computer slept and woke, resynchronize. // If it looks like the computer slept and woke, resynchronize.
if (lastDelayDelta >= ZT_SLEEP_WAKE_DETECTION_THRESHOLD) { if (lastDelayDelta >= ZT_SLEEP_WAKE_DETECTION_THRESHOLD) {
@ -577,18 +571,29 @@ Node::ReasonForTermination Node::run()
} }
} }
// Supernodes do not resynchronize unless explicitly ordered via SIGHUP.
if ((resynchronize)&&(_r->topology->amSupernode()))
resynchronize = false;
// Check for SIGHUP / force resync.
if (impl->resynchronize) {
impl->resynchronize = false;
resynchronize = true;
LOG("resynchronize forced by user, syncing with network");
}
if (resynchronize) if (resynchronize)
_r->timeOfLastResynchronize = now; _r->timeOfLastResynchronize = now;
/* Ping supernodes separately, and do so more aggressively if we haven't /* Ping supernodes separately, and do so more aggressively if we haven't
* heard anything from anyone since our last resynchronize / startup. */ * heard anything from anyone since our last resynchronize / startup. */
if ( ((now - lastSupernodePing) >= ZT_PEER_DIRECT_PING_DELAY) || if ( ((now - lastSupernodePing) >= ZT_PEER_DIRECT_PING_DELAY) ||
((_r->timeOfLastResynchronize > _r->timeOfLastPacketReceived) && ((now - lastSupernodePing) >= ZT_PING_UNANSWERED_AFTER)) ) { ((_r->timeOfLastResynchronize > _r->timeOfLastPacketReceived) && ((now - lastSupernodePing) >= ZT_STARTUP_AGGRO)) ) {
lastSupernodePing = now; lastSupernodePing = now;
std::vector< SharedPtr<Peer> > sns(_r->topology->supernodePeers()); std::vector< SharedPtr<Peer> > sns(_r->topology->supernodePeers());
TRACE("pinging %d supernodes",(int)sns.size()); TRACE("pinging %d supernodes",(int)sns.size());
for(std::vector< SharedPtr<Peer> >::const_iterator p(sns.begin());p!=sns.end();++p) for(std::vector< SharedPtr<Peer> >::const_iterator p(sns.begin());p!=sns.end();++p)
(*p)->sendPing(_r,now,resynchronize); (*p)->sendPing(_r,now);
} }
if (resynchronize) { if (resynchronize) {
@ -625,7 +630,7 @@ Node::ReasonForTermination Node::run()
if ((now - lastPingCheck) >= ZT_PING_CHECK_DELAY) { if ((now - lastPingCheck) >= ZT_PING_CHECK_DELAY) {
lastPingCheck = now; lastPingCheck = now;
try { try {
_r->topology->eachPeer(Topology::PingPeersThatNeedPing(_r,now,resynchronize)); _r->topology->eachPeer(Topology::PingPeersThatNeedPing(_r,now));
_r->topology->eachPeer(Topology::OpenPeersThatNeedFirewallOpener(_r,now)); _r->topology->eachPeer(Topology::OpenPeersThatNeedFirewallOpener(_r,now));
} catch (std::exception &exc) { } catch (std::exception &exc) {
LOG("unexpected exception running ping check cycle: %s",exc.what()); LOG("unexpected exception running ping check cycle: %s",exc.what());

View File

@ -490,6 +490,7 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared
const unsigned int signatureLen = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_FRAME + frameLen); const unsigned int signatureLen = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_FRAME + frameLen);
const unsigned char *const signature = field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_FRAME + frameLen + 2,signatureLen); const unsigned char *const signature = field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_FRAME + frameLen + 2,signatureLen);
/*
TRACE("MULTICAST_FRAME @%.16llx #%.16llx from %s<%s> via %s(%s) to %s [ %s, %d bytes, depth %d ]", TRACE("MULTICAST_FRAME @%.16llx #%.16llx from %s<%s> via %s(%s) to %s [ %s, %d bytes, depth %d ]",
(unsigned long long)nwid, (unsigned long long)nwid,
(unsigned long long)guid, (unsigned long long)guid,
@ -499,6 +500,7 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared
Switch::etherTypeName(etherType), Switch::etherTypeName(etherType),
(int)frameLen, (int)frameLen,
(int)depth); (int)depth);
*/
SharedPtr<Network> network(_r->nc->network(nwid)); SharedPtr<Network> network(_r->nc->network(nwid));

View File

@ -117,8 +117,13 @@ void Peer::receive(
_lastMulticastFrame = now; _lastMulticastFrame = now;
} }
bool Peer::send(const RuntimeEnvironment *_r,const void *data,unsigned int len,uint64_t now) bool Peer::send(const RuntimeEnvironment *_r,const void *data,unsigned int len,uint64_t now)
{ {
// Note: we'll still use TCP here if that's all we have, but if this
// is false we will prefer UDP.
bool useTcp = isTcpFailoverTime(_r,now);
Mutex::Lock _l(_lock); Mutex::Lock _l(_lock);
std::vector<Path>::iterator p(_paths.begin()); std::vector<Path>::iterator p(_paths.begin());
@ -127,11 +132,13 @@ bool Peer::send(const RuntimeEnvironment *_r,const void *data,unsigned int len,u
uint64_t bestPathLastReceived = p->lastReceived(); uint64_t bestPathLastReceived = p->lastReceived();
std::vector<Path>::iterator bestPath = p; std::vector<Path>::iterator bestPath = p;
bool bestPathIsTcp = p->tcp();
while (++p != _paths.end()) { while (++p != _paths.end()) {
uint64_t lr = p->lastReceived(); uint64_t lr = p->lastReceived();
if (lr > bestPathLastReceived) { if ( (lr > bestPathLastReceived) || ((bestPathIsTcp)&&(!useTcp)) ) {
bestPathLastReceived = lr; bestPathLastReceived = lr;
bestPath = p; bestPath = p;
bestPathIsTcp = p->tcp();
} }
} }
@ -156,35 +163,19 @@ bool Peer::sendFirewallOpener(const RuntimeEnvironment *_r,uint64_t now)
return sent; return sent;
} }
bool Peer::sendPing(const RuntimeEnvironment *_r,uint64_t now,bool firstSinceReset) bool Peer::sendPing(const RuntimeEnvironment *_r,uint64_t now)
{ {
bool sent = false; bool sent = false;
SharedPtr<Peer> self(this); SharedPtr<Peer> self(this);
// In the ping case we will never send TCP unless this returns true.
bool useTcp = isTcpFailoverTime(_r,now);
TRACE("PING %s (useTcp==%d)",_id.address().toString().c_str(),(int)useTcp);
Mutex::Lock _l(_lock); Mutex::Lock _l(_lock);
// NOTE: this will never ping a peer that has *only* TCP paths. Right
// now there's never such a thing as TCP is only for failover.
bool pingTcp;
if (!firstSinceReset) {
uint64_t lastUdp = 0;
uint64_t lastTcp = 0;
uint64_t lastPing = 0;
for(std::vector<Path>::iterator p(_paths.begin());p!=_paths.end();++p) {
if (p->tcp())
lastTcp = std::max(p->lastReceived(),lastTcp);
else lastUdp = std::max(p->lastReceived(),lastUdp);
lastPing = std::max(p->lastPing(),lastPing);
}
uint64_t lastAny = std::max(lastUdp,lastTcp);
pingTcp = ( ( (lastAny < lastPing) && ((lastPing - lastAny) >= ZT_PING_UNANSWERED_AFTER) ) || (lastTcp > lastUdp) );
} else pingTcp = false;
TRACE("PING %s (pingTcp==%d)",_id.address().toString().c_str(),(int)pingTcp);
for(std::vector<Path>::iterator p(_paths.begin());p!=_paths.end();++p) { for(std::vector<Path>::iterator p(_paths.begin());p!=_paths.end();++p) {
if ((pingTcp)||(!p->tcp())) { if ((useTcp)||(!p->tcp())) {
if (_r->sw->sendHELLO(self,*p)) { if (_r->sw->sendHELLO(self,*p)) {
p->sent(now); p->sent(now);
p->pinged(now); p->pinged(now);

View File

@ -160,10 +160,9 @@ public:
* *
* @param _r Runtime environment * @param _r Runtime environment
* @param now Current time * @param now Current time
* @param firstSinceReset If true, this is the first ping sent since a network reset
* @return True if send appears successful for at least one address type * @return True if send appears successful for at least one address type
*/ */
bool sendPing(const RuntimeEnvironment *_r,uint64_t now,bool firstSinceReset); bool sendPing(const RuntimeEnvironment *_r,uint64_t now);
/** /**
* Called periodically by Topology::clean() to remove stale paths and do other cleanup * Called periodically by Topology::clean() to remove stale paths and do other cleanup
@ -263,6 +262,33 @@ public:
return _lastAnnouncedTo; return _lastAnnouncedTo;
} }
/**
* @param _r Runtime environment
* @param now Current time
* @return True if it's time to attempt TCP failover (if we have TCP_OUT paths)
*/
inline bool isTcpFailoverTime(const RuntimeEnvironment *_r,uint64_t now) const
throw()
{
if ((now - _r->timeOfLastResynchronize) >= ZT_TCP_TUNNEL_FAILOVER_TIMEOUT) {
uint64_t lastUdpPingSent = 0;
uint64_t lastUdpReceive = 0;
{
Mutex::Lock _l(_lock);
for(std::vector<Path>::const_iterator p(_paths.begin());p!=_paths.end();++p) {
if (p->type() == Path::PATH_TYPE_UDP) {
lastUdpPingSent = std::max(lastUdpPingSent,p->lastPing());
lastUdpReceive = std::max(lastUdpReceive,p->lastReceived());
}
}
}
return ( (lastUdpPingSent > lastUdpReceive) && ((now - lastUdpPingSent) >= ZT_TCP_TUNNEL_FAILOVER_TIMEOUT) );
}
return false;
}
/** /**
* @return Current latency or 0 if unknown (max: 65535) * @return Current latency or 0 if unknown (max: 65535)
*/ */

View File

@ -207,11 +207,10 @@ public:
class PingPeersThatNeedPing class PingPeersThatNeedPing
{ {
public: public:
PingPeersThatNeedPing(const RuntimeEnvironment *renv,uint64_t now,bool firstSinceReset) throw() : PingPeersThatNeedPing(const RuntimeEnvironment *renv,uint64_t now) throw() :
_now(now), _now(now),
_supernodeAddresses(renv->topology->supernodeAddresses()), _supernodeAddresses(renv->topology->supernodeAddresses()),
_r(renv), _r(renv) {}
_firstSinceReset(firstSinceReset) {}
inline void operator()(Topology &t,const SharedPtr<Peer> &p) inline void operator()(Topology &t,const SharedPtr<Peer> &p)
{ {
@ -228,14 +227,13 @@ public:
/* 2b: peer is not a supernode */ /* 2b: peer is not a supernode */
(!_supernodeAddresses.count(p->address())) (!_supernodeAddresses.count(p->address()))
) )
) { p->sendPing(_r,_now,_firstSinceReset); } ) { p->sendPing(_r,_now); }
} }
private: private:
uint64_t _now; uint64_t _now;
std::set<Address> _supernodeAddresses; std::set<Address> _supernodeAddresses;
const RuntimeEnvironment *_r; const RuntimeEnvironment *_r;
bool _firstSinceReset;
}; };
/** /**