Force non-leaf peers into local active-backup bond when multipath is enabled

This commit is contained in:
Joseph Henry 2022-02-17 15:16:33 -08:00
parent 5d4a9a4aa1
commit 1b0c183913
No known key found for this signature in database
GPG Key ID: C45B33FF5EBC9344
3 changed files with 62 additions and 37 deletions

View File

@ -189,16 +189,19 @@ void Bond::processBackgroundTasks(void* tPtr, const int64_t now)
Bond::Bond(const RuntimeEnvironment* renv) : RR(renv)
{
initTimers();
}
Bond::Bond(const RuntimeEnvironment* renv, int policy, const SharedPtr<Peer>& peer) : RR(renv), _freeRandomByte((unsigned char)((uintptr_t)this >> 4) ^ ++s_freeRandomByteCounter), _peer(peer), _peerId(_peer->_id.address().toInt())
{
initTimers();
setBondParameters(policy, SharedPtr<Bond>(), false);
_policyAlias = getPolicyStrByCode(policy);
}
Bond::Bond(const RuntimeEnvironment* renv, std::string& basePolicy, std::string& policyAlias, const SharedPtr<Peer>& peer) : RR(renv), _policyAlias(policyAlias), _peer(peer)
{
initTimers();
setBondParameters(getPolicyCodeByStr(basePolicy), SharedPtr<Bond>(), false);
}
@ -208,6 +211,7 @@ Bond::Bond(const RuntimeEnvironment* renv, SharedPtr<Bond> originalBond, const S
, _peer(peer)
, _peerId(_peer->_id.address().toInt())
{
initTimers();
setBondParameters(originalBond->_policy, originalBond, true);
}
@ -756,20 +760,22 @@ void Bond::processBackgroundBondTasks(void* tPtr, int64_t now)
// Send ambient monitoring traffic
for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
if (_paths[i].p && _paths[i].allowed()) {
if ((_monitorInterval > 0) && ((now - _paths[i].p->_lastOut) >= _monitorInterval)) {
if ((_peer->remoteVersionProtocol() >= 5) && (! ((_peer->remoteVersionMajor() == 1) && (_peer->remoteVersionMinor() == 1) && (_peer->remoteVersionRevision() == 0)))) {
Packet outp(_peer->address(), RR->identity.address(), Packet::VERB_ECHO); // ECHO (this is our bond's heartbeat)
outp.armor(_peer->key(), true, _peer->aesKeysIfSupported());
RR->node->expectReplyTo(outp.packetId());
RR->node->putPacket(tPtr, _paths[i].p->localSocket(), _paths[i].p->address(), outp.data(), outp.size());
_paths[i].p->_lastOut = now;
_overheadBytes += outp.size();
debug("sent ECHO via link %s", pathToStr(_paths[i].p).c_str());
if (_isLeaf) {
if ((_monitorInterval > 0) && (((now - _paths[i].p->_lastIn) >= _monitorInterval) /*|| ((now - _paths[i].p->_lastOut) >= _monitorInterval)*/)) {
if ((_peer->remoteVersionProtocol() >= 5) && (! ((_peer->remoteVersionMajor() == 1) && (_peer->remoteVersionMinor() == 1) && (_peer->remoteVersionRevision() == 0)))) {
Packet outp(_peer->address(), RR->identity.address(), Packet::VERB_ECHO); // ECHO (this is our bond's heartbeat)
outp.armor(_peer->key(), true, _peer->aesKeysIfSupported());
RR->node->expectReplyTo(outp.packetId());
RR->node->putPacket(tPtr, _paths[i].p->localSocket(), _paths[i].p->address(), outp.data(), outp.size());
_paths[i].p->_lastOut = now;
_overheadBytes += outp.size();
debug("tx: verb 0x%-2x of len %4d via %s (ECHO)", Packet::VERB_ECHO, outp.size(), pathToStr(_paths[i].p).c_str());
}
}
// QOS
if (_paths[i].needsToSendQoS(now, _qosSendInterval)) {
sendQOS_MEASUREMENT(tPtr, i, _paths[i].p->localSocket(), _paths[i].p->address(), now);
}
}
// QOS
if (_paths[i].needsToSendQoS(now, _qosSendInterval)) {
sendQOS_MEASUREMENT(tPtr, i, _paths[i].p->localSocket(), _paths[i].p->address(), now);
}
}
}
@ -799,6 +805,7 @@ void Bond::curateBond(int64_t now, bool rebuildBond)
{
uint8_t tmpNumAliveLinks = 0;
uint8_t tmpNumTotalLinks = 0;
/**
* Update path state variables. State variables are used so that critical
* blocks that perform fast packet processing won't need to make as many
@ -816,14 +823,14 @@ void Bond::curateBond(int64_t now, bool rebuildBond)
/**
* Determine aliveness
*/
_paths[i].alive = (now - _paths[i].p->_lastIn) < _failoverInterval;
_paths[i].alive = _isLeaf ? (now - _paths[i].p->_lastIn) < _failoverInterval : (now - _paths[i].p->_lastIn) < ZT_PEER_PATH_EXPIRATION;
/**
* Determine current eligibility
*/
bool currEligibility = false;
// Simple RX age (driven by packets of any type and gratuitous VERB_HELLOs)
bool acceptableAge = _paths[i].p->age(now) < (_failoverInterval + _downDelay);
bool acceptableAge = _isLeaf ? (_paths[i].p->age(now) < (_failoverInterval + _downDelay)) : _paths[i].alive;
// Whether we've waited long enough since the link last came online
bool satisfiedUpDelay = (now - _paths[i].lastAliveToggle) >= _upDelay;
// Whether this path is still in its trial period
@ -1515,6 +1522,22 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
}
}
void Bond::initTimers()
{
_lastFlowExpirationCheck = 0;
_lastFlowRebalance = 0;
_lastSentPathNegotiationRequest = 0;
_lastPathNegotiationCheck = 0;
_lastPathNegotiationReceived = 0;
_lastQoSRateCheck = 0;
_lastQualityEstimation = 0;
_lastBondStatusLog = 0;
_lastSummaryDump = 0;
_lastActiveBackupPathChange = 0;
_lastFrame = 0;
_lastBackgroundTaskCheck = 0;
}
void Bond::setBondParameters(int policy, SharedPtr<Bond> templateBond, bool useTemplate)
{
// Sanity check for policy
@ -1522,27 +1545,26 @@ void Bond::setBondParameters(int policy, SharedPtr<Bond> templateBond, bool useT
_defaultPolicy = (_defaultPolicy <= ZT_BOND_POLICY_NONE || _defaultPolicy > ZT_BOND_POLICY_BALANCE_AWARE) ? ZT_BOND_POLICY_NONE : _defaultPolicy;
_policy = (policy <= ZT_BOND_POLICY_NONE || policy > ZT_BOND_POLICY_BALANCE_AWARE) ? ZT_BOND_POLICY_NONE : _defaultPolicy;
// Check if non-leaf to prevent spamming infrastructure
if (_peer) {
ZT_PeerRole role = RR->topology->role(_peer->address());
_isLeaf = (role != ZT_PEER_ROLE_PLANET && role != ZT_PEER_ROLE_MOON);
}
// Flows
_lastFlowExpirationCheck = 0;
_lastFlowRebalance = 0;
_allowFlowHashing = false;
// Path negotiation
_lastSentPathNegotiationRequest = 0;
_lastPathNegotiationCheck = 0;
_allowPathNegotiation = false;
_pathNegotiationCutoffCount = 0;
_lastPathNegotiationReceived = 0;
_localUtility = 0;
_negotiatedPathIdx = 0;
// QOS Verb (and related checks)
_qosCutoffCount = 0;
_lastQoSRateCheck = 0;
_lastQualityEstimation = 0;
// User preferences which may override the default bonding algorithm's behavior
@ -1552,8 +1574,6 @@ void Bond::setBondParameters(int policy, SharedPtr<Bond> templateBond, bool useT
// Bond status
_lastBondStatusLog = 0;
_lastSummaryDump = 0;
_isHealthy = false;
_numAliveLinks = 0;
_numTotalLinks = 0;
@ -1561,7 +1581,6 @@ void Bond::setBondParameters(int policy, SharedPtr<Bond> templateBond, bool useT
// active-backup
_lastActiveBackupPathChange = 0;
_abPathIdx = ZT_MAX_PEER_NETWORK_PATHS;
// rr
@ -1582,15 +1601,9 @@ void Bond::setBondParameters(int policy, SharedPtr<Bond> templateBond, bool useT
_maxAcceptablePacketLossRatio = 0.10f;
_maxAcceptablePacketErrorRatio = 0.10f;
// General timers
_lastFrame = 0;
_lastBackgroundTaskCheck = 0;
// balance-aware
_totalBondUnderload = 0;
_overheadBytes = 0;
/**
@ -1637,6 +1650,10 @@ void Bond::setBondParameters(int policy, SharedPtr<Bond> templateBond, bool useT
memcpy(_qw, templateBond->_qw, ZT_QOS_WEIGHT_SIZE * sizeof(float));
}
if (!_isLeaf) {
_policy = ZT_BOND_POLICY_ACTIVE_BACKUP;
}
// Timer geometry
_monitorInterval = _failoverInterval / ZT_BOND_ECHOS_PER_FAILOVER_INTERVAL;
@ -1668,9 +1685,9 @@ std::string Bond::pathToStr(const SharedPtr<Path>& path)
{
#ifdef ZT_TRACE
char pathStr[64] = { 0 };
char fullPathStr[256] = { 0 };
char fullPathStr[384] = { 0 };
path->address().toString(pathStr);
snprintf(fullPathStr, 256, "%.16llx-%s/%s", (unsigned long long)(path->localSocket()), getLink(path)->ifname().c_str(), pathStr);
snprintf(fullPathStr, 384, "%.16llx-%s/%s", (unsigned long long)(path->localSocket()), getLink(path)->ifname().c_str(), pathStr);
return std::string(fullPathStr);
#else
return "";
@ -1683,10 +1700,11 @@ void Bond::dumpPathStatus(int64_t now, int pathIdx)
std::string aliveOrDead = _paths[pathIdx].alive ? std::string("alive") : std::string("dead");
std::string eligibleOrNot = _paths[pathIdx].eligible ? std::string("eligible") : std::string("ineligible");
std::string bondedOrNot = _paths[pathIdx].bonded ? std::string("bonded") : std::string("unbonded");
log("path[%2d] --- %5s (%7d), %10s, %8s, flows=%-6d lat=%-8.3f pdv=%-7.3f err=%-6.4f loss=%-6.4f alloc=%-3d --- (%s)",
log("path[%2d] --- %5s (in %7d, out: %7d), %10s, %8s, flows=%-6d lat=%-8.3f pdv=%-7.3f err=%-6.4f loss=%-6.4f alloc=%-3d --- (%s)",
pathIdx,
aliveOrDead.c_str(),
_paths[pathIdx].p->age(now),
(now - _paths[pathIdx].p->_lastOut),
eligibleOrNot.c_str(),
bondedOrNot.c_str(),
_paths[pathIdx].assignedFlowCount,
@ -1709,13 +1727,14 @@ void Bond::dumpInfo(int64_t now, bool force)
_lastSummaryDump = now;
float overhead = (_overheadBytes / (timeSinceLastDump / 1000.0f) / 1000.0f);
_overheadBytes = 0;
log("bond: bp=%d, fi=%d, mi=%d, ud=%d, dd=%d, flows=%lu, overhead=%f KB/s",
log("bond: bp=%d, fi=%d, mi=%d, ud=%d, dd=%d, flows=%lu, leaf=%d, overhead=%f KB/s",
_policy,
_failoverInterval,
_monitorInterval,
_upDelay,
_downDelay,
(unsigned long)_flows.size(),
_isLeaf,
overhead);
for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
if (_paths[i].p) {

View File

@ -761,6 +761,11 @@ class Bond {
*/
void dequeueNextActiveBackupPath(uint64_t now);
/**
* Zero all timers
*/
void initTimers();
/**
* Set bond parameters to reasonable defaults, these may later be overwritten by
* user-specified parameters.
@ -1135,7 +1140,6 @@ class Bond {
/**
* Emit message to tracing system but with added timestamp and subsystem info
*
* TODO: Will be replaced when better logging facilities exist in Trace.hpp
*/
void log(const char* fmt, ...)
{
@ -1165,7 +1169,6 @@ class Bond {
/**
* Emit message to tracing system but with added timestamp and subsystem info
*
* TODO: Will be replaced when better logging facilities exist in Trace.hpp
*/
void debug(const char* fmt, ...)
{
@ -1499,6 +1502,7 @@ class Bond {
unsigned char _freeRandomByte; // Free byte of entropy that is updated on every packet egress event.
SharedPtr<Peer> _peer; // Remote peer that this bond services
unsigned long long _peerId; // ID of the peer that this bond services
bool _isLeaf;
/**
* Rate-limiting

View File

@ -568,6 +568,8 @@ private:
Mutex _paths_m;
Mutex _bond_m;
bool _isLeaf;
Identity _id;
unsigned int _directPathPushCutoffCount;