diff --git a/node/Bond.cpp b/node/Bond.cpp index 0d8d44897..14d1e1023 100644 --- a/node/Bond.cpp +++ b/node/Bond.cpp @@ -189,16 +189,19 @@ void Bond::processBackgroundTasks(void* tPtr, const int64_t now) Bond::Bond(const RuntimeEnvironment* renv) : RR(renv) { + initTimers(); } Bond::Bond(const RuntimeEnvironment* renv, int policy, const SharedPtr& peer) : RR(renv), _freeRandomByte((unsigned char)((uintptr_t)this >> 4) ^ ++s_freeRandomByteCounter), _peer(peer), _peerId(_peer->_id.address().toInt()) { + initTimers(); setBondParameters(policy, SharedPtr(), false); _policyAlias = getPolicyStrByCode(policy); } Bond::Bond(const RuntimeEnvironment* renv, std::string& basePolicy, std::string& policyAlias, const SharedPtr& peer) : RR(renv), _policyAlias(policyAlias), _peer(peer) { + initTimers(); setBondParameters(getPolicyCodeByStr(basePolicy), SharedPtr(), false); } @@ -208,6 +211,7 @@ Bond::Bond(const RuntimeEnvironment* renv, SharedPtr originalBond, const S , _peer(peer) , _peerId(_peer->_id.address().toInt()) { + initTimers(); setBondParameters(originalBond->_policy, originalBond, true); } @@ -756,20 +760,22 @@ void Bond::processBackgroundBondTasks(void* tPtr, int64_t now) // Send ambient monitoring traffic for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { if (_paths[i].p && _paths[i].allowed()) { - if ((_monitorInterval > 0) && ((now - _paths[i].p->_lastOut) >= _monitorInterval)) { - if ((_peer->remoteVersionProtocol() >= 5) && (! ((_peer->remoteVersionMajor() == 1) && (_peer->remoteVersionMinor() == 1) && (_peer->remoteVersionRevision() == 0)))) { - Packet outp(_peer->address(), RR->identity.address(), Packet::VERB_ECHO); // ECHO (this is our bond's heartbeat) - outp.armor(_peer->key(), true, _peer->aesKeysIfSupported()); - RR->node->expectReplyTo(outp.packetId()); - RR->node->putPacket(tPtr, _paths[i].p->localSocket(), _paths[i].p->address(), outp.data(), outp.size()); - _paths[i].p->_lastOut = now; - _overheadBytes += outp.size(); - debug("sent ECHO via link %s", pathToStr(_paths[i].p).c_str()); + if (_isLeaf) { + if ((_monitorInterval > 0) && (((now - _paths[i].p->_lastIn) >= _monitorInterval) /*|| ((now - _paths[i].p->_lastOut) >= _monitorInterval)*/)) { + if ((_peer->remoteVersionProtocol() >= 5) && (! ((_peer->remoteVersionMajor() == 1) && (_peer->remoteVersionMinor() == 1) && (_peer->remoteVersionRevision() == 0)))) { + Packet outp(_peer->address(), RR->identity.address(), Packet::VERB_ECHO); // ECHO (this is our bond's heartbeat) + outp.armor(_peer->key(), true, _peer->aesKeysIfSupported()); + RR->node->expectReplyTo(outp.packetId()); + RR->node->putPacket(tPtr, _paths[i].p->localSocket(), _paths[i].p->address(), outp.data(), outp.size()); + _paths[i].p->_lastOut = now; + _overheadBytes += outp.size(); + debug("tx: verb 0x%-2x of len %4d via %s (ECHO)", Packet::VERB_ECHO, outp.size(), pathToStr(_paths[i].p).c_str()); + } + } + // QOS + if (_paths[i].needsToSendQoS(now, _qosSendInterval)) { + sendQOS_MEASUREMENT(tPtr, i, _paths[i].p->localSocket(), _paths[i].p->address(), now); } - } - // QOS - if (_paths[i].needsToSendQoS(now, _qosSendInterval)) { - sendQOS_MEASUREMENT(tPtr, i, _paths[i].p->localSocket(), _paths[i].p->address(), now); } } } @@ -799,6 +805,7 @@ void Bond::curateBond(int64_t now, bool rebuildBond) { uint8_t tmpNumAliveLinks = 0; uint8_t tmpNumTotalLinks = 0; + /** * Update path state variables. State variables are used so that critical * blocks that perform fast packet processing won't need to make as many @@ -816,14 +823,14 @@ void Bond::curateBond(int64_t now, bool rebuildBond) /** * Determine aliveness */ - _paths[i].alive = (now - _paths[i].p->_lastIn) < _failoverInterval; + _paths[i].alive = _isLeaf ? (now - _paths[i].p->_lastIn) < _failoverInterval : (now - _paths[i].p->_lastIn) < ZT_PEER_PATH_EXPIRATION; /** * Determine current eligibility */ bool currEligibility = false; // Simple RX age (driven by packets of any type and gratuitous VERB_HELLOs) - bool acceptableAge = _paths[i].p->age(now) < (_failoverInterval + _downDelay); + bool acceptableAge = _isLeaf ? (_paths[i].p->age(now) < (_failoverInterval + _downDelay)) : _paths[i].alive; // Whether we've waited long enough since the link last came online bool satisfiedUpDelay = (now - _paths[i].lastAliveToggle) >= _upDelay; // Whether this path is still in its trial period @@ -1515,6 +1522,22 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) } } +void Bond::initTimers() +{ + _lastFlowExpirationCheck = 0; + _lastFlowRebalance = 0; + _lastSentPathNegotiationRequest = 0; + _lastPathNegotiationCheck = 0; + _lastPathNegotiationReceived = 0; + _lastQoSRateCheck = 0; + _lastQualityEstimation = 0; + _lastBondStatusLog = 0; + _lastSummaryDump = 0; + _lastActiveBackupPathChange = 0; + _lastFrame = 0; + _lastBackgroundTaskCheck = 0; +} + void Bond::setBondParameters(int policy, SharedPtr templateBond, bool useTemplate) { // Sanity check for policy @@ -1522,27 +1545,26 @@ void Bond::setBondParameters(int policy, SharedPtr templateBond, bool useT _defaultPolicy = (_defaultPolicy <= ZT_BOND_POLICY_NONE || _defaultPolicy > ZT_BOND_POLICY_BALANCE_AWARE) ? ZT_BOND_POLICY_NONE : _defaultPolicy; _policy = (policy <= ZT_BOND_POLICY_NONE || policy > ZT_BOND_POLICY_BALANCE_AWARE) ? ZT_BOND_POLICY_NONE : _defaultPolicy; + // Check if non-leaf to prevent spamming infrastructure + if (_peer) { + ZT_PeerRole role = RR->topology->role(_peer->address()); + _isLeaf = (role != ZT_PEER_ROLE_PLANET && role != ZT_PEER_ROLE_MOON); + } + // Flows - _lastFlowExpirationCheck = 0; - _lastFlowRebalance = 0; _allowFlowHashing = false; // Path negotiation - _lastSentPathNegotiationRequest = 0; - _lastPathNegotiationCheck = 0; _allowPathNegotiation = false; _pathNegotiationCutoffCount = 0; - _lastPathNegotiationReceived = 0; _localUtility = 0; _negotiatedPathIdx = 0; // QOS Verb (and related checks) _qosCutoffCount = 0; - _lastQoSRateCheck = 0; - _lastQualityEstimation = 0; // User preferences which may override the default bonding algorithm's behavior @@ -1552,8 +1574,6 @@ void Bond::setBondParameters(int policy, SharedPtr templateBond, bool useT // Bond status - _lastBondStatusLog = 0; - _lastSummaryDump = 0; _isHealthy = false; _numAliveLinks = 0; _numTotalLinks = 0; @@ -1561,7 +1581,6 @@ void Bond::setBondParameters(int policy, SharedPtr templateBond, bool useT // active-backup - _lastActiveBackupPathChange = 0; _abPathIdx = ZT_MAX_PEER_NETWORK_PATHS; // rr @@ -1582,15 +1601,9 @@ void Bond::setBondParameters(int policy, SharedPtr templateBond, bool useT _maxAcceptablePacketLossRatio = 0.10f; _maxAcceptablePacketErrorRatio = 0.10f; - // General timers - - _lastFrame = 0; - _lastBackgroundTaskCheck = 0; - // balance-aware _totalBondUnderload = 0; - _overheadBytes = 0; /** @@ -1637,6 +1650,10 @@ void Bond::setBondParameters(int policy, SharedPtr templateBond, bool useT memcpy(_qw, templateBond->_qw, ZT_QOS_WEIGHT_SIZE * sizeof(float)); } + if (!_isLeaf) { + _policy = ZT_BOND_POLICY_ACTIVE_BACKUP; + } + // Timer geometry _monitorInterval = _failoverInterval / ZT_BOND_ECHOS_PER_FAILOVER_INTERVAL; @@ -1668,9 +1685,9 @@ std::string Bond::pathToStr(const SharedPtr& path) { #ifdef ZT_TRACE char pathStr[64] = { 0 }; - char fullPathStr[256] = { 0 }; + char fullPathStr[384] = { 0 }; path->address().toString(pathStr); - snprintf(fullPathStr, 256, "%.16llx-%s/%s", (unsigned long long)(path->localSocket()), getLink(path)->ifname().c_str(), pathStr); + snprintf(fullPathStr, 384, "%.16llx-%s/%s", (unsigned long long)(path->localSocket()), getLink(path)->ifname().c_str(), pathStr); return std::string(fullPathStr); #else return ""; @@ -1683,10 +1700,11 @@ void Bond::dumpPathStatus(int64_t now, int pathIdx) std::string aliveOrDead = _paths[pathIdx].alive ? std::string("alive") : std::string("dead"); std::string eligibleOrNot = _paths[pathIdx].eligible ? std::string("eligible") : std::string("ineligible"); std::string bondedOrNot = _paths[pathIdx].bonded ? std::string("bonded") : std::string("unbonded"); - log("path[%2d] --- %5s (%7d), %10s, %8s, flows=%-6d lat=%-8.3f pdv=%-7.3f err=%-6.4f loss=%-6.4f alloc=%-3d --- (%s)", + log("path[%2d] --- %5s (in %7d, out: %7d), %10s, %8s, flows=%-6d lat=%-8.3f pdv=%-7.3f err=%-6.4f loss=%-6.4f alloc=%-3d --- (%s)", pathIdx, aliveOrDead.c_str(), _paths[pathIdx].p->age(now), + (now - _paths[pathIdx].p->_lastOut), eligibleOrNot.c_str(), bondedOrNot.c_str(), _paths[pathIdx].assignedFlowCount, @@ -1709,13 +1727,14 @@ void Bond::dumpInfo(int64_t now, bool force) _lastSummaryDump = now; float overhead = (_overheadBytes / (timeSinceLastDump / 1000.0f) / 1000.0f); _overheadBytes = 0; - log("bond: bp=%d, fi=%d, mi=%d, ud=%d, dd=%d, flows=%lu, overhead=%f KB/s", + log("bond: bp=%d, fi=%d, mi=%d, ud=%d, dd=%d, flows=%lu, leaf=%d, overhead=%f KB/s", _policy, _failoverInterval, _monitorInterval, _upDelay, _downDelay, (unsigned long)_flows.size(), + _isLeaf, overhead); for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { if (_paths[i].p) { diff --git a/node/Bond.hpp b/node/Bond.hpp index 007793fbd..b30d3a90f 100644 --- a/node/Bond.hpp +++ b/node/Bond.hpp @@ -761,6 +761,11 @@ class Bond { */ void dequeueNextActiveBackupPath(uint64_t now); + /** + * Zero all timers + */ + void initTimers(); + /** * Set bond parameters to reasonable defaults, these may later be overwritten by * user-specified parameters. @@ -1135,7 +1140,6 @@ class Bond { /** * Emit message to tracing system but with added timestamp and subsystem info * - * TODO: Will be replaced when better logging facilities exist in Trace.hpp */ void log(const char* fmt, ...) { @@ -1165,7 +1169,6 @@ class Bond { /** * Emit message to tracing system but with added timestamp and subsystem info * - * TODO: Will be replaced when better logging facilities exist in Trace.hpp */ void debug(const char* fmt, ...) { @@ -1499,6 +1502,7 @@ class Bond { unsigned char _freeRandomByte; // Free byte of entropy that is updated on every packet egress event. SharedPtr _peer; // Remote peer that this bond services unsigned long long _peerId; // ID of the peer that this bond services + bool _isLeaf; /** * Rate-limiting diff --git a/node/Peer.hpp b/node/Peer.hpp index 2368e9318..3bfe31950 100644 --- a/node/Peer.hpp +++ b/node/Peer.hpp @@ -568,6 +568,8 @@ private: Mutex _paths_m; Mutex _bond_m; + bool _isLeaf; + Identity _id; unsigned int _directPathPushCutoffCount;