diff --git a/include/ZeroTierOne.h b/include/ZeroTierOne.h index a100afd96..5b228e171 100644 --- a/include/ZeroTierOne.h +++ b/include/ZeroTierOne.h @@ -1315,6 +1315,11 @@ typedef struct */ unsigned int pathCount; + /** + * Whether this peer was ever reachable via an aggregate link + */ + bool hadAggregateLink; + /** * Known network paths to peer */ diff --git a/node/Constants.hpp b/node/Constants.hpp index 0d3692f11..420343ad0 100644 --- a/node/Constants.hpp +++ b/node/Constants.hpp @@ -274,6 +274,19 @@ */ #define ZT_MULTIPATH_BINDER_REFRESH_PERIOD 5000 +/** + * Packets are only used for QoS/ACK statistical sampling if their packet ID is divisible by + * this integer. This is to provide a mechanism for both peers to agree on which packets need + * special treatment without having to exchange information. Changing this value would be + * a breaking change and would necessitate a protocol version upgrade. Since each incoming and + * outgoing packet ID is checked against this value its evaluation is of the form: + * (id & (divisor - 1)) == 0, thus the divisor must be a power of 2. + * + * This value is set at (16) so that given a normally-distributed RNG output we will sample + * 1/16th (or ~6.25%) of packets. + */ +#define ZT_PATH_QOS_ACK_PROTOCOL_DIVISOR 0x10 + /** * Time horizon for VERB_QOS_MEASUREMENT and VERB_ACK packet processing cutoff */ @@ -384,7 +397,7 @@ /** * Minimum amount of time between each ACK packet */ -#define ZT_PATH_ACK_INTERVAL 250 +#define ZT_PATH_ACK_INTERVAL 1000 /** * How often an aggregate link statistics report is emitted into this tracing system diff --git a/node/Node.cpp b/node/Node.cpp index 24deeae2d..9b10dfdd9 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -450,6 +450,7 @@ ZT_PeerList *Node::peers() const for(std::vector< std::pair< Address,SharedPtr > >::iterator pi(peers.begin());pi!=peers.end();++pi) { ZT_Peer *p = &(pl->peers[pl->peerCount++]); p->address = pi->second->address().toInt(); + p->hadAggregateLink = 0; if (pi->second->remoteVersionKnown()) { p->versionMajor = pi->second->remoteVersionMajor(); p->versionMinor = pi->second->remoteVersionMinor(); @@ -466,6 +467,7 @@ ZT_PeerList *Node::peers() const std::vector< SharedPtr > paths(pi->second->paths(_now)); SharedPtr bestp(pi->second->getAppropriatePath(_now,false)); + p->hadAggregateLink |= pi->second->hasAggregateLink(); p->pathCount = 0; for(std::vector< SharedPtr >::iterator path(paths.begin());path!=paths.end();++path) { ZT_FAST_MEMCPY(&(p->paths[p->pathCount].address),&((*path)->address()),sizeof(struct sockaddr_storage)); @@ -475,14 +477,14 @@ ZT_PeerList *Node::peers() const p->paths[p->pathCount].expired = 0; p->paths[p->pathCount].preferred = ((*path) == bestp) ? 1 : 0; p->paths[p->pathCount].latency = (*path)->latency(); - p->paths[p->pathCount].packetDelayVariance = (*path)->packetDelayVariance(); + p->paths[p->pathCount].packetDelayVariance = (*path)->packetDelayVariance(); p->paths[p->pathCount].throughputDisturbCoeff = (*path)->throughputDisturbanceCoefficient(); p->paths[p->pathCount].packetErrorRatio = (*path)->packetErrorRatio(); p->paths[p->pathCount].packetLossRatio = (*path)->packetLossRatio(); p->paths[p->pathCount].stability = (*path)->lastComputedStability(); p->paths[p->pathCount].throughput = (*path)->meanThroughput(); p->paths[p->pathCount].maxThroughput = (*path)->maxLifetimeThroughput(); - p->paths[p->pathCount].allocation = (*path)->allocation(); + p->paths[p->pathCount].allocation = (float)(*path)->allocation() / (float)255; p->paths[p->pathCount].ifname = (*path)->getName(); ++p->pathCount; diff --git a/node/Path.hpp b/node/Path.hpp index fb2023069..cafff8cf3 100644 --- a/node/Path.hpp +++ b/node/Path.hpp @@ -121,7 +121,7 @@ public: _lastComputedStability(0.0), _lastComputedRelativeQuality(0), _lastComputedThroughputDistCoeff(0.0), - _lastAllocation(0.0) + _lastAllocation(0) { prepareBuffers(); } @@ -153,7 +153,7 @@ public: _lastComputedStability(0.0), _lastComputedRelativeQuality(0), _lastComputedThroughputDistCoeff(0.0), - _lastAllocation(0.0) + _lastAllocation(0) { prepareBuffers(); _phy->getIfName((PhySocket *)((uintptr_t)_localSocket), _ifname, 16); @@ -316,12 +316,10 @@ public: { Mutex::Lock _l(_statistics_m); if (verb != Packet::VERB_ACK && verb != Packet::VERB_QOS_MEASUREMENT) { - if (packetId % 2 == 0) { // even -> use for ACK + if ((packetId & (ZT_PATH_QOS_ACK_PROTOCOL_DIVISOR - 1)) == 0) { _unackedBytes += payloadLength; // Take note that we're expecting a VERB_ACK on this path as of a specific time _expectingAckAsOf = ackAge(now) > ZT_PATH_ACK_INTERVAL ? _expectingAckAsOf : now; - } - else { // odd -> use for QoS if (_outQoSRecords.size() < ZT_PATH_MAX_OUTSTANDING_QOS_RECORDS) { _outQoSRecords[packetId] = now; } @@ -341,11 +339,9 @@ public: { Mutex::Lock _l(_statistics_m); if (verb != Packet::VERB_ACK && verb != Packet::VERB_QOS_MEASUREMENT) { - if (packetId % 2 == 0) { // even -> use for ACK + if ((packetId & (ZT_PATH_QOS_ACK_PROTOCOL_DIVISOR - 1)) == 0) { _inACKRecords[packetId] = payloadLength; _packetsReceivedSinceLastAck++; - } - else { // odd -> use for QoS _inQoSRecords[packetId] = now; _packetsReceivedSinceLastQoS++; } @@ -527,12 +523,12 @@ public: * * @param allocation Percentage of traffic to be sent over this path to a peer */ - inline void updateComponentAllocationOfAggregateLink(float allocation) { _lastAllocation = allocation; } + inline void updateComponentAllocationOfAggregateLink(unsigned char allocation) { _lastAllocation = allocation; } /** * @return Percentage of traffic allocated to this path in the aggregate link */ - inline float allocation() { return _lastAllocation; } + inline unsigned char allocation() { return _lastAllocation; } /** * @return Stability estimates can become expensive to compute, we cache the most recent result. @@ -704,7 +700,9 @@ private: float _lastComputedStability; float _lastComputedRelativeQuality; float _lastComputedThroughputDistCoeff; - float _lastAllocation; + unsigned char _lastAllocation; + + // cached human-readable strings for tracing purposes char _ifname[16]; diff --git a/node/Peer.cpp b/node/Peer.cpp index 1d581ab8e..21bbfabe2 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -56,6 +56,12 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident _lastSentFullHello(0), _lastACKWindowReset(0), _lastQoSWindowReset(0), + _lastMultipathCompatibilityCheck(0), + _freeRandomByte(0), + _uniqueAlivePathCount(0), + _localMultipathSupported(false), + _remoteMultipathSupported(false), + _canUseMultipath(false), _vProto(0), _vMajor(0), _vMinor(0), @@ -69,6 +75,7 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident _lastAggregateStatsReport(0), _lastAggregateAllocation(0) { + Utils::getSecureRandom(&_freeRandomByte, 1); if (!myIdentity.agree(peerIdentity,_key,ZT_PEER_SECRET_KEY_LENGTH)) throw ZT_EXCEPTION_INVALID_ARGUMENT; _pathChoiceHist = new RingBuffer(ZT_MULTIPATH_PROPORTION_WIN_SZ); @@ -110,7 +117,7 @@ void Peer::received( recordIncomingPacket(tPtr, path, packetId, payloadLength, verb, now); - if (canUseMultipath()) { + if (_canUseMultipath) { if (path->needsToSendQoS(now)) { sendQOS_MEASUREMENT(tPtr, path, path->localSocket(), path->address(), now); } @@ -145,17 +152,23 @@ void Peer::received( // Paths are redundant if they duplicate an alive path to the same IP or // with the same local socket and address family. bool redundant = false; + unsigned int replacePath = ZT_MAX_PEER_NETWORK_PATHS; for(unsigned int i=0;ialive(now)) && ( ((_paths[i].p->localSocket() == path->localSocket())&&(_paths[i].p->address().ss_family == path->address().ss_family)) || (_paths[i].p->address().ipsEqual2(path->address())) ) ) { + if ( (_paths[i].p->alive(now)) && ( ((_paths[i].p->localSocket() == path->localSocket())&&(_paths[i].p->address().ss_family == path->address().ss_family)) || (_paths[i].p->address().ipsEqual2(path->address())) ) ) { redundant = true; break; } + // If the path is the same address and port, simply assume this is a replacement + if ( (_paths[i].p->address().ipsEqual2(path->address()) && (_paths[i].p->address().port() == path->address().port()))) { + replacePath = i; + break; + } } else break; } - - if (!redundant) { - unsigned int replacePath = ZT_MAX_PEER_NETWORK_PATHS; + // If the path isn't a duplicate of the same localSocket AND we haven't already determined a replacePath, + // then find the worst path and replace it. + if (!redundant && replacePath == ZT_MAX_PEER_NETWORK_PATHS) { int replacePathQuality = 0; for(unsigned int i=0;iaddress().ss_family == path->address().ss_family && _paths[i].p->address().ipsEqual2(path->address())) { - replacePath = i; - break; - } - } - } - } - - if (replacePath != ZT_MAX_PEER_NETWORK_PATHS) { - if (verb == Packet::VERB_OK) { - RR->t->peerLearnedNewPath(tPtr,networkId,*this,path,packetId); - _paths[replacePath].lr = now; - _paths[replacePath].p = path; - _paths[replacePath].priority = 1; - } else { - attemptToContact = true; - } + } + if (replacePath != ZT_MAX_PEER_NETWORK_PATHS) { + if (verb == Packet::VERB_OK) { + RR->t->peerLearnedNewPath(tPtr,networkId,*this,path,packetId); + _paths[replacePath].lr = now; + _paths[replacePath].p = path; + _paths[replacePath].priority = 1; + } else { + attemptToContact = true; } } } @@ -274,7 +273,9 @@ void Peer::received( void Peer::recordOutgoingPacket(const SharedPtr &path, const uint64_t packetId, uint16_t payloadLength, const Packet::Verb verb, int64_t now) { - if (localMultipathSupport()) { + // Grab second byte from packetId to use as a source of entropy in the next path selection + _freeRandomByte = (packetId & 0xFF00) >> 8; + if (_canUseMultipath) { path->recordOutgoingPacket(now, packetId, payloadLength, verb); } } @@ -282,7 +283,7 @@ void Peer::recordOutgoingPacket(const SharedPtr &path, const uint64_t pack void Peer::recordIncomingPacket(void *tPtr, const SharedPtr &path, const uint64_t packetId, uint16_t payloadLength, const Packet::Verb verb, int64_t now) { - if (localMultipathSupport()) { + if (_canUseMultipath) { if (path->needsToSendAck(now)) { sendACK(tPtr, path, path->localSocket(), path->address(), now); } @@ -323,6 +324,9 @@ void Peer::computeAggregateProportionalAllocation(int64_t now) + (fmax(1, relThroughput[i]) * ZT_PATH_CONTRIB_THROUGHPUT) + relScope * ZT_PATH_CONTRIB_SCOPE; relQuality *= age_contrib; + // Arbitrary cutoffs + relQuality = relQuality > (1.00 / 100.0) ? relQuality : 0.0; + relQuality = relQuality < (99.0 / 100.0) ? relQuality : 1.0; totalRelativeQuality += relQuality; _paths[i].p->updateRelativeQuality(relQuality); } @@ -330,12 +334,12 @@ void Peer::computeAggregateProportionalAllocation(int64_t now) // Convert set of relative performances into an allocation set for(uint16_t i=0;iupdateComponentAllocationOfAggregateLink(_paths[i].p->relativeQuality() / totalRelativeQuality); + _paths[i].p->updateComponentAllocationOfAggregateLink((_paths[i].p->relativeQuality() / totalRelativeQuality) * 255); } } } -float Peer::computeAggregateLinkPacketDelayVariance() +int Peer::computeAggregateLinkPacketDelayVariance() { float pdv = 0.0; for(unsigned int i=0;i Peer::getAppropriatePath(int64_t now, bool includeExpired) * Send traffic across the highest quality path only. This algorithm will still * use the old path quality metric from protocol version 9. */ - if (!canUseMultipath()) { + if (!_canUseMultipath) { long bestPathQuality = 2147483647; for(unsigned int i=0;i Peer::getAppropriatePath(int64_t now, bool includeExpired) } } } - unsigned int r; - Utils::getSecureRandom(&r, 1); + unsigned int r = _freeRandomByte; if (numAlivePaths > 0) { - // pick a random out of the set deemed "alive" int rf = r % numAlivePaths; return _paths[alivePaths[rf]].p; } else if(numStalePaths > 0) { - // resort to trying any non-expired path + // Resort to trying any non-expired path int rf = r % numStalePaths; return _paths[stalePaths[rf]].p; } @@ -461,40 +463,12 @@ SharedPtr Peer::getAppropriatePath(int64_t now, bool includeExpired) * Proportionally allocate traffic according to dynamic path quality measurements */ if (RR->node->getMultipathMode() == ZT_MULTIPATH_PROPORTIONALLY_BALANCED) { - int numAlivePaths = 0; - int numStalePaths = 0; - int alivePaths[ZT_MAX_PEER_NETWORK_PATHS]; - int stalePaths[ZT_MAX_PEER_NETWORK_PATHS]; - memset(&alivePaths, -1, sizeof(alivePaths)); - memset(&stalePaths, -1, sizeof(stalePaths)); - // Attempt to find an excuse not to use the rest of this algorithm - // Alive or Stale? - for(unsigned int i=0;ialive(now)) { - alivePaths[numAlivePaths] = i; - numAlivePaths++; - } else { - stalePaths[numStalePaths] = i; - numStalePaths++; - } - // Record a default path to use as a short-circuit for the rest of the algorithm (if needed) - bestPath = i; - } - } if ((now - _lastAggregateAllocation) >= ZT_PATH_QUALITY_COMPUTE_INTERVAL) { _lastAggregateAllocation = now; computeAggregateProportionalAllocation(now); } - if (numAlivePaths == 0 && numStalePaths == 0) { - return SharedPtr(); - } if (numAlivePaths == 1 || numStalePaths == 1) { - return _paths[bestPath].p; - } // Randomly choose path according to their allocations - unsigned int r; - Utils::getSecureRandom(&r, 1); - float rf = (float)(r %= 100) / 100; + float rf = _freeRandomByte; for(int i=0;iallocation()) { @@ -676,6 +650,41 @@ void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr &o } } +inline void Peer::processBackgroundPeerTasks(int64_t now) +{ + // Determine current multipath compatibility with other peer + if ((now - _lastMultipathCompatibilityCheck) >= ZT_PATH_QUALITY_COMPUTE_INTERVAL) { + // Cache number of available paths so that we can short-circuit multipath logic elsewhere + // + // We also take notice of duplicate paths (same IP only) because we may have + // recently received a direct path push from a peer and our list might contain + // a dead path which hasn't been fully recognized as such. In this case we + // don't want the duplicate to trigger execution of multipath code prematurely. + // + // This is done to support the behavior of auto multipath enable/disable + // without user intervention. + int currAlivePathCount = 0; + int duplicatePathsFound = 0; + for (unsigned int i=0;iaddress().ipsEqual2(_paths[j].p->address()) && i != j) { + duplicatePathsFound+=1; + break; + } + } + } + } + _uniqueAlivePathCount = (currAlivePathCount - (duplicatePathsFound / 2)); + _lastMultipathCompatibilityCheck = now; + _localMultipathSupported = ((RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) && (ZT_PROTO_VERSION > 9)); + _remoteMultipathSupported = _vProto > 9; + // If both peers support multipath and more than one path exist, we can use multipath logic + _canUseMultipath = _localMultipathSupported && _remoteMultipathSupported && (_uniqueAlivePathCount > 1); + } +} + void Peer::sendACK(void *tPtr,const SharedPtr &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now) { Packet outp(_id.address(),RR->identity.address(),Packet::VERB_ACK); @@ -774,14 +783,15 @@ void Peer::tryMemorizedPath(void *tPtr,int64_t now) unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now) { unsigned int sent = 0; - Mutex::Lock _l(_paths_m); const bool sendFullHello = ((now - _lastSentFullHello) >= ZT_PEER_PING_PERIOD); _lastSentFullHello = now; + processBackgroundPeerTasks(now); + // Emit traces regarding aggregate link status - if (canUseMultipath()) { + if (_canUseMultipath) { int alivePathCount = aggregateLinkPhysicalPathCount(); if ((now - _lastAggregateStatsReport) > ZT_PATH_AGGREGATE_STATS_REPORT_INTERVAL) { _lastAggregateStatsReport = now; diff --git a/node/Peer.hpp b/node/Peer.hpp index ddbe6f770..a32eaad06 100644 --- a/node/Peer.hpp +++ b/node/Peer.hpp @@ -203,12 +203,12 @@ public: /** * @return The aggregate link Packet Delay Variance (PDV) */ - float computeAggregateLinkPacketDelayVariance(); + int computeAggregateLinkPacketDelayVariance(); /** * @return The aggregate link mean latency */ - float computeAggregateLinkMeanLatency(); + int computeAggregateLinkMeanLatency(); /** * @return The number of currently alive "physical" paths in the aggregate link @@ -357,7 +357,7 @@ public: */ inline unsigned int latency(const int64_t now) { - if (RR->node->getMultipathMode()) { + if (_canUseMultipath) { return (int)computeAggregateLinkMeanLatency(); } else { SharedPtr bp(getAppropriatePath(now,false)); @@ -417,6 +417,14 @@ public: inline bool remoteVersionKnown() const { return ((_vMajor > 0)||(_vMinor > 0)||(_vRevision > 0)); } + /** + * Periodically update known multipath activation constraints. This is done so that we know when and when + * not to use multipath logic. Doing this once every few seconds is sufficient. + * + * @param now Current time + */ + inline void processBackgroundPeerTasks(int64_t now); + /** * Record that the remote peer does have multipath enabled. As is evident by the receipt of a VERB_ACK * or a VERB_QOS_MEASUREMENT packet at some point in the past. Until this flag is set, the local client @@ -427,18 +435,18 @@ public: /** * @return Whether the local client supports and is configured to use multipath */ - inline bool localMultipathSupport() { return ((RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) && (ZT_PROTO_VERSION > 9)); } + inline bool localMultipathSupport() { return _localMultipathSupported; } /** * @return Whether the remote peer supports and is configured to use multipath */ - inline bool remoteMultipathSupport() { return (_remotePeerMultipathEnabled && (_vProto > 9)); } + inline bool remoteMultipathSupport() { return _remoteMultipathSupported; } /** * @return Whether this client can use multipath to communicate with this peer. True if both peers are using * the correct protocol and if both peers have multipath enabled. False if otherwise. */ - inline bool canUseMultipath() { return (localMultipathSupport() && remoteMultipathSupport()); } + inline bool canUseMultipath() { return _canUseMultipath; } /** * @return True if peer has received a trust established packet (e.g. common network membership) in the past ZT_TRUST_EXPIRATION ms @@ -557,6 +565,13 @@ public: return (_QoSCutoffCount < ZT_PATH_QOS_ACK_CUTOFF_LIMIT); } + /** + * @return Whether this peer is reachable via an aggregate link + */ + inline bool hasAggregateLink() { + return _localMultipathSupported && _remoteMultipathSupported && _remotePeerMultipathEnabled; + } + /** * Serialize a peer for storage in local cache * @@ -658,6 +673,15 @@ private: int64_t _lastPathPrune; int64_t _lastACKWindowReset; int64_t _lastQoSWindowReset; + int64_t _lastMultipathCompatibilityCheck; + + unsigned char _freeRandomByte; + + int _uniqueAlivePathCount; + + bool _localMultipathSupported; + bool _remoteMultipathSupported; + bool _canUseMultipath; uint16_t _vProto; uint16_t _vMajor; diff --git a/service/OneService.cpp b/service/OneService.cpp index a1a9d981b..9b12f17b2 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -1229,7 +1229,7 @@ public: char peerAddrStr[256]; if (pl) { for(unsigned long i=0;ipeerCount;++i) { - if (pl->peers[i].role == ZT_PEER_ROLE_LEAF) { + if (pl->peers[i].hadAggregateLink) { nlohmann::json pj; _peerAggregateLinkToJson(pj,&(pl->peers[i])); OSUtils::ztsnprintf(peerAddrStr,sizeof(peerAddrStr),"%.10llx",pl->peers[i].address);