Mark geo-redirected paths as suboptimal and do not report that we have a peer if all we have is one of these. Also a few other small fixes.

This commit is contained in:
Adam Ierymenko 2015-11-09 14:25:28 -08:00
parent 47424df417
commit 35c4e28f31
6 changed files with 60 additions and 47 deletions

View File

@ -239,7 +239,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
case CLUSTER_MESSAGE_WANT_PEER: { case CLUSTER_MESSAGE_WANT_PEER: {
const Address zeroTierAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; const Address zeroTierAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
SharedPtr<Peer> peer(RR->topology->getPeerNoCache(zeroTierAddress)); SharedPtr<Peer> peer(RR->topology->getPeerNoCache(zeroTierAddress));
if ( (peer) && (peer->hasActiveDirectPath(RR->node->now())) ) { if ( (peer) && (peer->hasClusterOptimalPath(RR->node->now())) ) {
Buffer<1024> buf; Buffer<1024> buf;
peer->identity().serialize(buf); peer->identity().serialize(buf);
Mutex::Lock _l2(_members[fromMemberId].lock); Mutex::Lock _l2(_members[fromMemberId].lock);

View File

@ -55,7 +55,7 @@
* A cluster member is considered dead and will no longer have peers * A cluster member is considered dead and will no longer have peers
* redirected to it if we have not heard a heartbeat in this long. * redirected to it if we have not heard a heartbeat in this long.
*/ */
#define ZT_CLUSTER_TIMEOUT 10000 #define ZT_CLUSTER_TIMEOUT 5000
/** /**
* Desired period between doPeriodicTasks() in milliseconds * Desired period between doPeriodicTasks() in milliseconds

View File

@ -37,6 +37,16 @@
#include "Constants.hpp" #include "Constants.hpp"
#include "InetAddress.hpp" #include "InetAddress.hpp"
/**
* Flag indicating that this path is suboptimal
*
* This is used in cluster mode to indicate that the peer has been directed
* to a better path. This path can continue to be used but shouldn't be kept
* or advertised to other cluster members. Not used if clustering is not
* built and enabled.
*/
#define ZT_PATH_FLAG_CLUSTER_SUBOPTIMAL 0x0001
namespace ZeroTier { namespace ZeroTier {
class RuntimeEnvironment; class RuntimeEnvironment;
@ -54,6 +64,7 @@ public:
_lastReceived(0), _lastReceived(0),
_addr(), _addr(),
_localAddress(), _localAddress(),
_flags(0),
_ipScope(InetAddress::IP_SCOPE_NONE) _ipScope(InetAddress::IP_SCOPE_NONE)
{ {
} }
@ -63,12 +74,12 @@ public:
_lastReceived(0), _lastReceived(0),
_addr(addr), _addr(addr),
_localAddress(localAddress), _localAddress(localAddress),
_flags(0),
_ipScope(addr.ipScope()) _ipScope(addr.ipScope())
{ {
} }
inline Path &operator=(const Path &p) inline Path &operator=(const Path &p)
throw()
{ {
if (this != &p) if (this != &p)
memcpy(this,&p,sizeof(Path)); memcpy(this,&p,sizeof(Path));
@ -82,22 +93,14 @@ public:
* *
* @param t Time of send * @param t Time of send
*/ */
inline void sent(uint64_t t) inline void sent(uint64_t t) { _lastSend = t; }
throw()
{
_lastSend = t;
}
/** /**
* Called when a packet is received from this remote path * Called when a packet is received from this remote path
* *
* @param t Time of receive * @param t Time of receive
*/ */
inline void received(uint64_t t) inline void received(uint64_t t) { _lastReceived = t; }
throw()
{
_lastReceived = t;
}
/** /**
* @param now Current time * @param now Current time
@ -207,26 +210,40 @@ public:
return false; return false;
} }
#ifdef ZT_ENABLE_CLUSTER
/**
* @param f New value of ZT_PATH_FLAG_CLUSTER_SUBOPTIMAL
*/
inline void setClusterSuboptimal(bool f) { _flags = ((f) ? (_flags | ZT_PATH_FLAG_CLUSTER_SUBOPTIMAL) : (_flags & (~ZT_PATH_FLAG_CLUSTER_SUBOPTIMAL))); }
/**
* @return True if ZT_PATH_FLAG_CLUSTER_SUBOPTIMAL is set
*/
inline bool isClusterSuboptimal() const { return ((_flags & ZT_PATH_FLAG_CLUSTER_SUBOPTIMAL) != 0); }
#endif
template<unsigned int C> template<unsigned int C>
inline void serialize(Buffer<C> &b) const inline void serialize(Buffer<C> &b) const
{ {
b.append((uint8_t)1); // version b.append((uint8_t)0); // version
b.append((uint64_t)_lastSend); b.append((uint64_t)_lastSend);
b.append((uint64_t)_lastReceived); b.append((uint64_t)_lastReceived);
_addr.serialize(b); _addr.serialize(b);
_localAddress.serialize(b); _localAddress.serialize(b);
b.append((uint16_t)_flags);
} }
template<unsigned int C> template<unsigned int C>
inline unsigned int deserialize(const Buffer<C> &b,unsigned int startAt = 0) inline unsigned int deserialize(const Buffer<C> &b,unsigned int startAt = 0)
{ {
unsigned int p = startAt; unsigned int p = startAt;
if (b[p++] != 1) if (b[p++] != 0)
throw std::invalid_argument("invalid serialized Path"); throw std::invalid_argument("invalid serialized Path");
_lastSend = b.template at<uint64_t>(p); p += 8; _lastSend = b.template at<uint64_t>(p); p += 8;
_lastReceived = b.template at<uint64_t>(p); p += 8; _lastReceived = b.template at<uint64_t>(p); p += 8;
p += _addr.deserialize(b,p); p += _addr.deserialize(b,p);
p += _localAddress.deserialize(b,p); p += _localAddress.deserialize(b,p);
_flags = b.template at<uint16_t>(p); p += 2;
_ipScope = _addr.ipScope(); _ipScope = _addr.ipScope();
return (p - startAt); return (p - startAt);
} }
@ -236,6 +253,7 @@ private:
uint64_t _lastReceived; uint64_t _lastReceived;
InetAddress _addr; InetAddress _addr;
InetAddress _localAddress; InetAddress _localAddress;
unsigned int _flags;
InetAddress::IpScope _ipScope; // memoize this since it's a computed value checked often InetAddress::IpScope _ipScope; // memoize this since it's a computed value checked often
}; };

View File

@ -83,6 +83,7 @@ void Peer::received(
Packet::Verb inReVerb) Packet::Verb inReVerb)
{ {
#ifdef ZT_ENABLE_CLUSTER #ifdef ZT_ENABLE_CLUSTER
bool suboptimalPath = false;
if ((RR->cluster)&&(hops == 0)) { if ((RR->cluster)&&(hops == 0)) {
// Note: findBetterEndpoint() is first since we still want to check // Note: findBetterEndpoint() is first since we still want to check
// for a better endpoint even if we don't actually send a redirect. // for a better endpoint even if we don't actually send a redirect.
@ -124,6 +125,7 @@ void Peer::received(
RR->antiRec->logOutgoingZT(outp.data(),outp.size()); RR->antiRec->logOutgoingZT(outp.data(),outp.size());
RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size()); RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size());
} }
suboptimalPath = true;
} }
} }
#endif #endif
@ -151,6 +153,9 @@ void Peer::received(
for(unsigned int p=0;p<np;++p) { for(unsigned int p=0;p<np;++p) {
if ((_paths[p].address() == remoteAddr)&&(_paths[p].localAddress() == localAddr)) { if ((_paths[p].address() == remoteAddr)&&(_paths[p].localAddress() == localAddr)) {
_paths[p].received(now); _paths[p].received(now);
#ifdef ZT_ENABLE_CLUSTER
_paths[p].setClusterSuboptimal(suboptimalPath);
#endif
pathIsConfirmed = true; pathIsConfirmed = true;
break; break;
} }
@ -174,6 +179,9 @@ void Peer::received(
if (slot) { if (slot) {
*slot = Path(localAddr,remoteAddr); *slot = Path(localAddr,remoteAddr);
slot->received(now); slot->received(now);
#ifdef ZT_ENABLE_CLUSTER
slot->setClusterSuboptimal(suboptimalPath);
#endif
_numPaths = np; _numPaths = np;
pathIsConfirmed = true; pathIsConfirmed = true;
_sortPaths(now); _sortPaths(now);

View File

@ -269,7 +269,6 @@ public:
* @param l Direct latency measurment in ms * @param l Direct latency measurment in ms
*/ */
inline void addDirectLatencyMeasurment(unsigned int l) inline void addDirectLatencyMeasurment(unsigned int l)
throw()
{ {
unsigned int ol = _latency; unsigned int ol = _latency;
if ((ol > 0)&&(ol < 10000)) if ((ol > 0)&&(ol < 10000))
@ -282,7 +281,6 @@ public:
* @return True if this peer has at least one active direct path * @return True if this peer has at least one active direct path
*/ */
inline bool hasActiveDirectPath(uint64_t now) const inline bool hasActiveDirectPath(uint64_t now) const
throw()
{ {
Mutex::Lock _l(_lock); Mutex::Lock _l(_lock);
for(unsigned int p=0,np=_numPaths;p<np;++p) { for(unsigned int p=0,np=_numPaths;p<np;++p) {
@ -292,6 +290,22 @@ public:
return false; return false;
} }
#ifdef ZT_ENABLE_CLUSTER
/**
* @param now Current time
* @return True if this peer has at least one active direct path that is not cluster-suboptimal
*/
inline bool hasClusterOptimalPath(uint64_t now) const
{
Mutex::Lock _l(_lock);
for(unsigned int p=0,np=_numPaths;p<np;++p) {
if ((_paths[p].active(now))&&(!_paths[p].isClusterSuboptimal()))
return true;
}
return false;
}
#endif
/** /**
* Reset paths within a given scope * Reset paths within a given scope
* *
@ -329,33 +343,6 @@ public:
inline unsigned int remoteVersionRevision() const throw() { return _vRevision; } inline unsigned int remoteVersionRevision() const throw() { return _vRevision; }
inline bool remoteVersionKnown() const throw() { return ((_vMajor > 0)||(_vMinor > 0)||(_vRevision > 0)); } inline bool remoteVersionKnown() const throw() { return ((_vMajor > 0)||(_vMinor > 0)||(_vRevision > 0)); }
/**
* Check whether this peer's version is both known and is at least what is specified
*
* @param major Major version to check against
* @param minor Minor version
* @param rev Revision
* @return True if peer's version is at least supplied tuple
*/
inline bool atLeastVersion(unsigned int major,unsigned int minor,unsigned int rev)
throw()
{
Mutex::Lock _l(_lock);
if ((_vMajor > 0)||(_vMinor > 0)||(_vRevision > 0)) {
if (_vMajor > major)
return true;
else if (_vMajor == major) {
if (_vMinor > minor)
return true;
else if (_vMinor == minor) {
if (_vRevision >= rev)
return true;
}
}
}
return false;
}
/** /**
* Get most recently active path addresses for IPv4 and/or IPv6 * Get most recently active path addresses for IPv4 and/or IPv6
* *
@ -467,7 +454,7 @@ public:
const unsigned int recSizePos = b.size(); const unsigned int recSizePos = b.size();
b.addSize(4); // space for uint32_t field length b.addSize(4); // space for uint32_t field length
b.append((uint16_t)1); // version of serialized Peer data b.append((uint16_t)0); // version of serialized Peer data
_id.serialize(b,false); _id.serialize(b,false);
@ -531,7 +518,7 @@ public:
const unsigned int recSize = b.template at<uint32_t>(p); p += 4; const unsigned int recSize = b.template at<uint32_t>(p); p += 4;
if ((p + recSize) > b.size()) if ((p + recSize) > b.size())
return SharedPtr<Peer>(); // size invalid return SharedPtr<Peer>(); // size invalid
if (b.template at<uint16_t>(p) != 1) if (b.template at<uint16_t>(p) != 0)
return SharedPtr<Peer>(); // version mismatch return SharedPtr<Peer>(); // version mismatch
p += 2; p += 2;

View File

@ -517,7 +517,7 @@ public:
for(int k=0;k<512;++k) { for(int k=0;k<512;++k) {
const unsigned int upnport = 40000 + (((port + 1) * (k + 1)) % 25500); const unsigned int upnport = 40000 + (((port + 1) * (k + 1)) % 25500);
_v4UpnpLocalAddress = InetAddress(0,upnport); _v4UpnpLocalAddress = InetAddress(0,upnport);
_v4UpnpUdpSocket = _phy.udpBind((const struct sockaddr *)&_v4UpnpLocalAddress,reinterpret_cast<void *>(&_v4UpnpLocalAddress),131072); _v4UpnpUdpSocket = _phy.udpBind((const struct sockaddr *)&_v4UpnpLocalAddress,reinterpret_cast<void *>(&_v4UpnpLocalAddress),ZT_UDP_DESIRED_BUF_SIZE);
if (_v4UpnpUdpSocket) { if (_v4UpnpUdpSocket) {
_upnpClient = new UPNPClient(upnport); _upnpClient = new UPNPClient(upnport);
break; break;