diff --git a/include/ZeroTierOne.h b/include/ZeroTierOne.h index 27b08ff5c..d905078a1 100644 --- a/include/ZeroTierOne.h +++ b/include/ZeroTierOne.h @@ -1347,8 +1347,16 @@ typedef struct */ char ifname[ZT_MAX_PHYSIFNAME]; + /** + * Pointer to PhySocket object for this path + */ uint64_t localSocket; + /** + * Local port corresponding to this path's localSocket + */ + uint16_t localPort; + /** * Is path expired? */ diff --git a/node/Bond.cpp b/node/Bond.cpp index 9bd9ee41e..7876003db 100644 --- a/node/Bond.cpp +++ b/node/Bond.cpp @@ -15,10 +15,10 @@ #include "Switch.hpp" +#include // for PRId64, etc. macros #include #include #include -#include // for PRId64, etc. macros // FIXME: remove this suppression and actually fix warnings #ifdef __GNUC__ @@ -108,7 +108,7 @@ bool Bond::setAllMtuByTuple(uint16_t mtu, const std::string& ifStr, const std::s std::map >::iterator bondItr = _bonds.begin(); bool found = false; while (bondItr != _bonds.end()) { - if (bondItr->second->setMtuByTuple(mtu,ifStr,ipStr)) { + if (bondItr->second->setMtuByTuple(mtu, ifStr, ipStr)) { found = true; } ++bondItr; @@ -154,11 +154,13 @@ SharedPtr Bond::createBond(const RuntimeEnvironment* renv, const SharedPtr bond = new Bond(renv, _bondPolicyTemplates[_defaultPolicyStr].ptr(), peer); bond->debug("new default custom bond (based on %s)", bond->getPolicyStrByCode(bond->policy()).c_str()); } - } else { + } + else { if (! _bondPolicyTemplates[_policyTemplateAssignments[identity]]) { bond = new Bond(renv, _defaultPolicy, peer); bond->debug("peer-specific bond, was specified as %s but the bond definition was not found, using default %s", _policyTemplateAssignments[identity].c_str(), getPolicyStrByCode(_defaultPolicy).c_str()); - } else { + } + else { bond = new Bond(renv, _bondPolicyTemplates[_policyTemplateAssignments[identity]].ptr(), peer); bond->debug("new default bond"); } @@ -227,10 +229,12 @@ SharedPtr Bond::getLinkBySocket(const std::string& policyAlias, uint64_t l SharedPtr s = new Link(ifnameStr, 0, 0, 0, true, ZT_BOND_SLAVE_MODE_PRIMARY, ""); _interfaceToLinkMap[policyAlias].insert(std::pair >(ifnameStr, s)); return s; - } else { + } + else { return SharedPtr(); } - } else { + } + else { return search->second; } } @@ -340,6 +344,7 @@ void Bond::nominatePathToBond(const SharedPtr& path, int64_t now) _paths[i].ipvPref = sl->ipvPref(); _paths[i].mode = sl->mode(); _paths[i].enabled = sl->enabled(); + _paths[i].localPort = _phy->getLocalPort((PhySocket*)((uintptr_t)path->localSocket())); _paths[i].onlyPathOnLink = ! bFoundCommonLink; } } @@ -397,7 +402,8 @@ SharedPtr Bond::getAppropriatePath(int64_t now, int32_t flowId) _rrPacketsSentOnCurrLink = 0; if (_numBondedPaths == 1 || _rrIdx >= (ZT_MAX_PEER_NETWORK_PATHS - 1)) { _rrIdx = 0; - } else { + } + else { int _tempIdx = _rrIdx; for (int searchCount = 0; searchCount < (_numBondedPaths - 1); searchCount++) { _tempIdx = (_tempIdx == (_numBondedPaths - 1)) ? 0 : _tempIdx + 1; @@ -427,7 +433,8 @@ SharedPtr Bond::getAppropriatePath(int64_t now, int32_t flowId) if (likely(it != _flows.end())) { it->second->lastActivity = now; return _paths[it->second->assignedPath].p; - } else { + } + else { unsigned char entropy; Utils::getSecureRandom(&entropy, 1); SharedPtr flow = createFlow(ZT_MAX_PEER_NETWORK_PATHS, flowId, entropy, now); @@ -505,7 +512,8 @@ void Bond::recordIncomingPacket(const SharedPtr& path, uint64_t packetId, _paths[pathIdx].qosStatsIn[packetId] = now; ++(_paths[pathIdx].packetsReceivedSinceLastQoS); //_paths[pathIdx].packetValiditySamples.push(true); - } else { + } + else { // debug("QoS buffer full, will not record information"); } /* @@ -532,7 +540,8 @@ void Bond::recordIncomingPacket(const SharedPtr& path, uint64_t packetId, SharedPtr flow; if (! _flows.count(flowId)) { flow = createFlow(pathIdx, flowId, 0, now); - } else { + } + else { flow = _flows[flowId]; } if (flow) { @@ -618,7 +627,8 @@ bool Bond::assignFlowToBondedPath(SharedPtr& flow, int64_t now, bool reass if (reassign) { log("attempting to re-assign out-flow %04x previously on idx %d (%u / %zu flows)", flow->id, flow->assignedPath, _paths[_realIdxMap[flow->assignedPath]].assignedFlowCount, _flows.size()); - } else { + } + else { debug("attempting to assign flow for the first time"); } @@ -632,7 +642,8 @@ bool Bond::assignFlowToBondedPath(SharedPtr& flow, int64_t now, bool reass if (reassign) { bondedIdx = (flow->assignedPath + offset) % (_numBondedPaths); - } else { + } + else { bondedIdx = abs((int)((entropy + offset) % (_numBondedPaths))); } // debug("idx=%d, offset=%d, randomCap=%f, actualCap=%f", bondedIdx, offset, randomLinkCapacity, _paths[_realIdxMap[bondedIdx]].relativeLinkCapacity); @@ -655,7 +666,8 @@ bool Bond::assignFlowToBondedPath(SharedPtr& flow, int64_t now, bool reass flow->assignPath(_realIdxMap[bondedIdx], now); ++(_paths[_realIdxMap[bondedIdx]].assignedFlowCount); // debug(" ABLE to find optimal link %f idx %d", _paths[_realIdxMap[bondedIdx]].relativeQuality, bondedIdx); - } else { + } + else { // We were (unable) to find a path that didn't violate at least one quality requirement, will choose next best option flow->assignPath(_realIdxMap[nextBestQualIdx], now); ++(_paths[_realIdxMap[nextBestQualIdx]].assignedFlowCount); @@ -715,11 +727,13 @@ void Bond::forgetFlowsWhenNecessary(uint64_t age, bool oldest, int64_t now) debug("forget flow %04x (age %" PRId64 ") (%u / %zu)", it->first, it->second->age(now), _paths[it->second->assignedPath].assignedFlowCount, (_flows.size() - 1)); _paths[it->second->assignedPath].assignedFlowCount--; it = _flows.erase(it); - } else { + } + else { ++it; } } - } else if (oldest) { // Remove single oldest by natural expiration + } + else if (oldest) { // Remove single oldest by natural expiration uint64_t maxAge = 0; while (it != _flows.end()) { if (it->second->age(now) > maxAge) { @@ -766,7 +780,8 @@ void Bond::processIncomingPathNegotiationRequest(uint64_t now, SharedPtr& if (_peer->_id.address().toInt() > RR->node->identity().address().toInt()) { debug("agree with peer to use alternate link %s/%s\n", link->ifname().c_str(), pathStr); _negotiatedPathIdx = pathIdx; - } else { + } + else { debug("ignore petition from peer to use alternate link %s/%s\n", link->ifname().c_str(), pathStr); } } @@ -881,7 +896,8 @@ void Bond::sendQOS_MEASUREMENT(void* tPtr, int pathIdx, int64_t localSocket, con if (atAddress) { outp.armor(_peer->key(), false, _peer->aesKeysIfSupported()); RR->node->putPacket(tPtr, localSocket, atAddress, outp.data(), outp.size()); - } else { + } + else { RR->sw->send(tPtr, outp, false); } Metrics::pkt_qos_out++; @@ -1222,7 +1238,8 @@ void Bond::estimatePathQuality(int64_t now) if ((now - it->second) >= qosRecordTimeout) { it = _paths[i].qosStatsOut.erase(it); ++numDroppedQosOutRecords; - } else { + } + else { ++it; } } @@ -1250,7 +1267,8 @@ void Bond::estimatePathQuality(int64_t now) if ((now - it->second) >= qosRecordTimeout) { it = _paths[i].qosStatsIn.erase(it); ++numDroppedQosInRecords; - } else { + } + else { ++it; } } @@ -1327,10 +1345,10 @@ void Bond::estimatePathQuality(int64_t now) continue; } // Compute/Smooth average of real-world observations - if (_paths[i].latencySamples.count() == ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE) { + if (_paths[i].latencySamples.count() >= ZT_QOS_SHORTTERM_SAMPLE_WIN_MIN_REQ_SIZE) { _paths[i].latency = _paths[i].latencySamples.mean(); } - if (_paths[i].latencySamples.count() == ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE) { + if (_paths[i].latencySamples.count() >= ZT_QOS_SHORTTERM_SAMPLE_WIN_MIN_REQ_SIZE) { _paths[i].latencyVariance = _paths[i].latencySamples.stddev(); } @@ -1344,6 +1362,7 @@ void Bond::estimatePathQuality(int64_t now) //_paths[i].packetErrorRatio = 1.0 - (_paths[i].packetValiditySamples.count() ? _paths[i].packetValiditySamples.mean() : 1.0); // _valid is written elsewhere _paths[i].p->_relativeQuality = _paths[i].relativeQuality; + _paths[i].p->_localPort = _paths[i].localPort; } // Flag links for avoidance @@ -1370,7 +1389,8 @@ void Bond::estimatePathQuality(int64_t now) shouldAvoid = true; } _paths[i].shouldAvoid = shouldAvoid; - } else { + } + else { if (! shouldAvoid) { log("no longer avoiding link %s", pathToStr(_paths[i].p).c_str()); _paths[i].shouldAvoid = false; @@ -1482,7 +1502,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) _lastBondStatusLog = now; if (_abPathIdx == ZT_MAX_PEER_NETWORK_PATHS) { log("no active link"); - } else if (_paths[_abPathIdx].p) { + } + else if (_paths[_abPathIdx].p) { log("active link is %s, failover queue size is %zu", pathToStr(_paths[_abPathIdx].p).c_str(), _abFailoverQueue.size()); } if (_abFailoverQueue.empty()) { @@ -1590,7 +1611,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) log("link %s is ineligible, removing from failover queue (%zu links remain in queue)", pathToStr(_paths[_abPathIdx].p).c_str(), _abFailoverQueue.size()); } continue; - } else { + } + else { ++it; } } @@ -1739,7 +1761,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) if (! _abFailoverQueue.empty()) { dequeueNextActiveBackupPath(now); log("active link switched to %s", pathToStr(_paths[_abPathIdx].p).c_str()); - } else { + } + else { log("failover queue is empty, no links to choose from"); } } @@ -1785,7 +1808,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) dequeueNextActiveBackupPath(now); _lastPathNegotiationCheck = now; log("switch negotiated link %s (select mode: optimize)", pathToStr(_paths[_abPathIdx].p).c_str()); - } else { + } + else { // Try to find a better path and automatically switch to it -- not too often, though. if ((now - _lastActiveBackupPathChange) > ZT_BOND_OPTIMIZE_INTERVAL) { if (! _abFailoverQueue.empty()) { @@ -1901,7 +1925,7 @@ void Bond::setBondParameters(int policy, SharedPtr templateBond, bool useT } if (! _isLeaf) { - _policy = ZT_BOND_POLICY_ACTIVE_BACKUP; + _policy = ZT_BOND_POLICY_NONE; } // Timer geometry diff --git a/node/Bond.hpp b/node/Bond.hpp index c6347a8cb..408c1e125 100644 --- a/node/Bond.hpp +++ b/node/Bond.hpp @@ -315,7 +315,6 @@ class Peer; class Bond { public: - /** * Stop bond's internal functions (can be resumed) */ @@ -909,7 +908,8 @@ class Bond { _lastAckRateCheck = now; if (_ackCutoffCount > numToDrain) { _ackCutoffCount -= numToDrain; - } else { + } + else { _ackCutoffCount = 0; } return (_ackCutoffCount < ZT_ACK_CUTOFF_LIMIT); @@ -928,7 +928,8 @@ class Bond { uint64_t diff = now - _lastQoSRateCheck; if ((diff) <= (_qosSendInterval / ZT_MAX_PEER_NETWORK_PATHS)) { ++_qosCutoffCount; - } else { + } + else { _qosCutoffCount = 0; } _lastQoSRateCheck = now; @@ -948,7 +949,8 @@ class Bond { int diff = now - _lastPathNegotiationReceived; if ((diff) <= (ZT_PATH_NEGOTIATION_CUTOFF_TIME / ZT_MAX_PEER_NETWORK_PATHS)) { ++_pathNegotiationCutoffCount; - } else { + } + else { _pathNegotiationCutoffCount = 0; } _lastPathNegotiationReceived = now; @@ -1230,6 +1232,7 @@ class Bond { , packetsReceivedSinceLastQoS(0) , packetsIn(0) , packetsOut(0) + , localPort(0) { } @@ -1245,17 +1248,20 @@ class Bond { unsigned int suggestedRefractoryPeriod = refractoryPeriod ? punishment + (refractoryPeriod * 2) : punishment; refractoryPeriod = std::min(suggestedRefractoryPeriod, (unsigned int)ZT_BOND_MAX_REFRACTORY_PERIOD); lastRefractoryUpdate = 0; - } else { + } + else { uint32_t drainRefractory = 0; if (lastRefractoryUpdate) { drainRefractory = (now - lastRefractoryUpdate); - } else { + } + else { drainRefractory = (now - lastAliveToggle); } lastRefractoryUpdate = now; if (refractoryPeriod > drainRefractory) { refractoryPeriod -= drainRefractory; - } else { + } + else { refractoryPeriod = 0; lastRefractoryUpdate = 0; } @@ -1292,7 +1298,6 @@ class Bond { */ inline bool needsToSendQoS(int64_t now, uint64_t qosSendInterval) { - // fprintf(stderr, "QOS table (%d / %d)\n", packetsReceivedSinceLastQoS, ZT_QOS_TABLE_SIZE); return ((packetsReceivedSinceLastQoS >= ZT_QOS_TABLE_SIZE) || ((now - lastQoSMeasurement) > qosSendInterval)) && packetsReceivedSinceLastQoS; } @@ -1364,6 +1369,8 @@ class Bond { int packetsIn; int packetsOut; + uint16_t localPort; + // AtomicCounter __refCount; SharedPtr p; diff --git a/node/Constants.hpp b/node/Constants.hpp index 54995c29a..dacba21c5 100644 --- a/node/Constants.hpp +++ b/node/Constants.hpp @@ -399,6 +399,11 @@ */ #define ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE 64 +/** + * Number of samples required before statistics summaries are computed + */ +#define ZT_QOS_SHORTTERM_SAMPLE_WIN_MIN_REQ_SIZE 4 + /** * Max allowable time spent in any queue (in ms) */ diff --git a/node/Node.cpp b/node/Node.cpp index 4913d1a4c..f8d0401f2 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -578,6 +578,7 @@ ZT_PeerList *Node::peers() const if((*path)->valid()) { memcpy(&(p->paths[p->pathCount].address),&((*path)->address()),sizeof(struct sockaddr_storage)); p->paths[p->pathCount].localSocket = (*path)->localSocket(); + p->paths[p->pathCount].localPort = (*path)->localPort(); p->paths[p->pathCount].lastSend = (*path)->lastOut(); p->paths[p->pathCount].lastReceive = (*path)->lastIn(); p->paths[p->pathCount].trustedPathId = RR->topology->getOutboundPathTrust((*path)->address()); diff --git a/node/Path.hpp b/node/Path.hpp index b7694920e..c35bafdd1 100644 --- a/node/Path.hpp +++ b/node/Path.hpp @@ -84,6 +84,7 @@ public: _lastIn(0), _lastTrustEstablishedPacketReceived(0), _lastEchoRequestReceived(0), + _localPort(0), _localSocket(-1), _latencyMean(0.0), _latencyVariance(0.0), @@ -106,6 +107,7 @@ public: _lastIn(0), _lastTrustEstablishedPacketReceived(0), _lastEchoRequestReceived(0), + _localPort(0), _localSocket(localSocket), _latencyMean(0.0), _latencyVariance(0.0), @@ -177,6 +179,11 @@ public: */ inline int64_t localSocket() const { return _localSocket; } + /** + * @return Local port corresponding to the localSocket + */ + inline int64_t localPort() const { return _localPort; } + /** * @return Physical address */ @@ -375,6 +382,7 @@ private: int64_t _lastEchoRequestReceived; + uint16_t _localPort; int64_t _localSocket; volatile float _latencyMean; diff --git a/one.cpp b/one.cpp index 977da1f20..23912856c 100644 --- a/one.cpp +++ b/one.cpp @@ -664,37 +664,37 @@ static int cli(int argc,char **argv) printf("\nidx" " interface" " " - "path socket\n"); - for(int i=0; i<100; i++) { printf("-"); } + "path socket local port\n"); + for(int i=0; i<120; i++) { printf("-"); } printf("\n"); for (int i=0; i(s)->uptr); } + /** + * Return the local port corresponding to this PhySocket + * + * @param s Socket object + * + * @return Local port corresponding to this PhySocket + */ + static inline uint16_t getLocalPort(PhySocket* s) throw() + { + return reinterpret_cast(s)->localPort; + } + /** * Cause poll() to stop waiting immediately * @@ -417,6 +430,11 @@ public: sws.type = ZT_PHY_SOCKET_UDP; sws.sock = s; sws.uptr = uptr; + +#ifdef __UNIX_LIKE__ + struct sockaddr_in *sin = (struct sockaddr_in *)localAddress; + sws.localPort = htons(sin->sin_port); +#endif memset(&(sws.saddr),0,sizeof(struct sockaddr_storage)); memcpy(&(sws.saddr),localAddress,(localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)); diff --git a/service/OneService.cpp b/service/OneService.cpp index 5bd67a2ca..b41abb602 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -637,6 +637,7 @@ static void _peerToJson(nlohmann::json &pj,const ZT_Peer *peer, SharedPtr j["expired"] = (bool)(peer->paths[i].expired != 0); j["preferred"] = (bool)(peer->paths[i].preferred != 0); j["localSocket"] = peer->paths[i].localSocket; + j["localPort"] = peer->paths[i].localPort; if (bond && peer->isBonded) { uint64_t now = OSUtils::now(); j["ifname"] = std::string(peer->paths[i].ifname);