From 5f0ee4fc78f438d00f382b4e29ebdde621e7e160 Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Tue, 16 Jun 2020 12:30:21 -0700 Subject: [PATCH] Fix invalid defaultBondingPolicy conditions, Add ZT_MultipathFlowRebalanceStrategy, Add basic hysteresis mechanism to flow re-assignment --- include/ZeroTierOne.h | 27 ++++ node/Bond.cpp | 270 ++++++++++++++++++---------------------- node/Bond.hpp | 10 +- node/BondController.cpp | 5 +- node/Constants.hpp | 5 - node/Flow.hpp | 1 + node/IncomingPacket.cpp | 2 + node/Peer.cpp | 3 +- service/OneService.cpp | 1 + 9 files changed, 166 insertions(+), 158 deletions(-) diff --git a/include/ZeroTierOne.h b/include/ZeroTierOne.h index 890e56048..dfb520469 100644 --- a/include/ZeroTierOne.h +++ b/include/ZeroTierOne.h @@ -521,6 +521,33 @@ enum ZT_MultipathMonitorStrategy ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC = 3 }; +/** + * Strategy for re-balancing protocol flows + */ +enum ZT_MultipathFlowRebalanceStrategy +{ + /** + * Flows will only be re-balanced among slaves during + * assignment or failover. This minimizes the possibility + * of sequence reordering and is thus the default setting. + */ + ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_PASSIVE = 0, + + /** + * Flows that are active may be re-assigned to a new more + * suitable slave if it can be done without disrupting the flow. + * This setting can sometimes cause sequence re-ordering. + */ + ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_OPPORTUNISTIC = 0, + + /** + * Flows will be continuously re-assigned the most suitable slave + * in order to maximize "balance". This can often cause sequence + * reordering and is thus only reccomended for protocols like UDP. + */ + ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_AGGRESSIVE = 2 +}; + /** * Indices for the path quality weight vector */ diff --git a/node/Bond.cpp b/node/Bond.cpp index 9a2f5c267..656285925 100644 --- a/node/Bond.cpp +++ b/node/Bond.cpp @@ -25,6 +25,10 @@ Bond::Bond(const RuntimeEnvironment *renv, int policy, const SharedPtr& pe RR(renv), _peer(peer) { + // TODO: Remove for production + _header=false; + _lastLogTS = RR->node->now(); + _lastPrintTS = RR->node->now(); setReasonableDefaults(policy, SharedPtr(), false); _policyAlias = BondController::getPolicyStrByCode(policy); } @@ -41,6 +45,10 @@ Bond::Bond(const RuntimeEnvironment *renv, SharedPtr originalBond, const S RR(renv), _peer(peer) { + // TODO: Remove for production + _header=false; + _lastLogTS = RR->node->now(); + _lastPrintTS = RR->node->now(); setReasonableDefaults(originalBond->_bondingPolicy, originalBond, true); } @@ -162,7 +170,7 @@ SharedPtr Bond::getAppropriatePath(int64_t now, int32_t flowId) void Bond::recordIncomingInvalidPacket(const SharedPtr& path) { - //char pathStr[128];path->address().toString(pathStr);fprintf(stderr, "recordIncomingInvalidPacket() %s %s\n", getSlave(path)->ifname().c_str(), pathStr); + // char pathStr[128];path->address().toString(pathStr);fprintf(stderr, "recordIncomingInvalidPacket() %s %s\n", getSlave(path)->ifname().c_str(), pathStr); Mutex::Lock _l(_paths_m); for (int i=0; i& path) void Bond::recordOutgoingPacket(const SharedPtr &path, const uint64_t packetId, uint16_t payloadLength, const Packet::Verb verb, const int32_t flowId, int64_t now) { - //char pathStr[128];path->address().toString(pathStr);fprintf(stderr, "recordOutgoingPacket() %s %s, packetId=%llx, payloadLength=%d, verb=%x, flowId=%lx\n", getSlave(path)->ifname().c_str(), pathStr, packetId, payloadLength, verb, flowId); + // char pathStr[128];path->address().toString(pathStr);fprintf(stderr, "recordOutgoingPacket() %s %s, packetId=%llx, payloadLength=%d, verb=%x, flowId=%lx\n", getSlave(path)->ifname().c_str(), pathStr, packetId, payloadLength, verb, flowId); _freeRandomByte += (unsigned char)(packetId >> 8); // Grab entropy to use in path selection logic if (!_shouldCollectPathStatistics) { return; @@ -320,6 +328,7 @@ bool Bond::assignFlowToBondedPath(SharedPtr &flow, int64_t now) idx = abs((int)(flow->id() % (_numBondedPaths))); //fprintf(stderr, "flow->id()=%d, %x, _numBondedPaths=%d, idx=%d\n", flow->id(), flow->id(), _numBondedPaths, idx); flow->assignPath(_paths[_bondedIdx[idx]],now); + ++(_paths[_bondedIdx[idx]]->_assignedFlowCount); } if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) { unsigned char entropy; @@ -341,29 +350,32 @@ bool Bond::assignFlowToBondedPath(SharedPtr &flow, int64_t now) totalIncompleteAllocation += _paths[i]->_allocation; } } - fprintf(stderr, "entropy = %d, totalIncompleteAllocation=%d\n", entropy, totalIncompleteAllocation); + //fprintf(stderr, "entropy = %d, totalIncompleteAllocation=%d\n", entropy, totalIncompleteAllocation); entropy %= totalIncompleteAllocation; - fprintf(stderr, "new entropy = %d\n", entropy); + //fprintf(stderr, "new entropy = %d\n", entropy); for(unsigned int i=0;ibonded()) { SharedPtr slave = RR->bc->getSlaveBySocket(_policyAlias, _paths[i]->localSocket()); _paths[i]->address().toString(curPathStr); uint8_t probabilitySegment = (_totalBondUnderload > 0) ? _paths[i]->_affinity : _paths[i]->_allocation; - fprintf(stderr, "i=%2d, entropy=%3d, alloc=%3d, byteload=%4d, segment=%3d, _totalBondUnderload=%3d, ifname=%s, path=%20s\n", i, entropy, _paths[i]->_allocation, _paths[i]->_relativeByteLoad, probabilitySegment, _totalBondUnderload, slave->ifname().c_str(), curPathStr); + //fprintf(stderr, "i=%2d, entropy=%3d, alloc=%3d, byteload=%4d, segment=%3d, _totalBondUnderload=%3d, ifname=%s, path=%20s\n", i, entropy, _paths[i]->_allocation, _paths[i]->_relativeByteLoad, probabilitySegment, _totalBondUnderload, slave->ifname().c_str(), curPathStr); if (entropy <= probabilitySegment) { idx = i; - fprintf(stderr, "\t is best path\n"); + //fprintf(stderr, "\t is best path\n"); break; } entropy -= probabilitySegment; } } if (idx < ZT_MAX_PEER_NETWORK_PATHS) { + if (flow->_assignedPath) { + flow->_previouslyAssignedPath = flow->_assignedPath; + } flow->assignPath(_paths[idx],now); ++(_paths[idx]->_assignedFlowCount); } else { - fprintf(stderr, "could not assign flow?\n"); exit(0); // TODO: Remove + fprintf(stderr, "could not assign flow?\n"); exit(0); // TODO: Remove for production return false; } } @@ -397,6 +409,7 @@ SharedPtr Bond::createFlow(const SharedPtr &path, int32_t flowId, un if (path) { flow->assignPath(path,now); path->address().toString(curPathStr); + path->_assignedFlowCount++; SharedPtr slave = RR->bc->getSlaveBySocket(_policyAlias, flow->assignedPath()->localSocket()); fprintf(stderr, "assigned (rx) flow %x with peer %llx to path %s on %s\n", flow->id(), _peer->_id.address().toInt(), curPathStr, slave->ifname().c_str()); } @@ -818,7 +831,7 @@ void Bond::curateBond(const int64_t now, bool rebuildBond) ++it; ++updatedBondedPathCount; _paths[_bondedIdx[i]]->address().toString(pathStr); - fprintf(stderr, "setting i=%d, _bondedIdx[%d]=%d to bonded (%s %s)\n", i, i, _bondedIdx[i], getSlave(_paths[_bondedIdx[i]])->ifname().c_str(), pathStr); + //fprintf(stderr, "setting i=%d, _bondedIdx[%d]=%d to bonded (%s %s)\n", i, i, _bondedIdx[i], getSlave(_paths[_bondedIdx[i]])->ifname().c_str(), pathStr); } } _numBondedPaths = updatedBondedPathCount; @@ -834,8 +847,6 @@ void Bond::curateBond(const int64_t now, bool rebuildBond) void Bond::estimatePathQuality(const int64_t now) { char pathStr[128]; - //--- - uint32_t totUserSpecifiedSlaveSpeed = 0; if (_numBondedPaths) { // Compute relative user-specified speeds of slaves for(unsigned int i=0;i<_numBondedPaths;++i) { @@ -856,17 +867,11 @@ void Bond::estimatePathQuality(const int64_t now) float pdv[ZT_MAX_PEER_NETWORK_PATHS]; float plr[ZT_MAX_PEER_NETWORK_PATHS]; float per[ZT_MAX_PEER_NETWORK_PATHS]; - float thr[ZT_MAX_PEER_NETWORK_PATHS]; - float thm[ZT_MAX_PEER_NETWORK_PATHS]; - float thv[ZT_MAX_PEER_NETWORK_PATHS]; float maxLAT = 0; float maxPDV = 0; float maxPLR = 0; float maxPER = 0; - float maxTHR = 0; - float maxTHM = 0; - float maxTHV = 0; float quality[ZT_MAX_PEER_NETWORK_PATHS]; uint8_t alloc[ZT_MAX_PEER_NETWORK_PATHS]; @@ -877,9 +882,6 @@ void Bond::estimatePathQuality(const int64_t now) memset(&pdv, 0, sizeof(pdv)); memset(&plr, 0, sizeof(plr)); memset(&per, 0, sizeof(per)); - memset(&thr, 0, sizeof(thr)); - memset(&thm, 0, sizeof(thm)); - memset(&thv, 0, sizeof(thv)); memset(&quality, 0, sizeof(quality)); memset(&alloc, 0, sizeof(alloc)); @@ -901,24 +903,6 @@ void Bond::estimatePathQuality(const int64_t now) _paths[i]->_throughputVariance = 0; } } - /* - else { - // Use estimated metrics - if (_paths[i]->throughputSamples.count()) { - // If we have samples, use them - _paths[i]->throughputMean = (uint64_t)_paths[i]->throughputSamples.mean(); - if (_paths[i]->throughputMean > 0) { - _paths[i]->throughputVarianceSamples.push((float)_paths[i]->throughputSamples.stddev() / (float)_paths[i]->throughputMean); - _paths[i]->throughputVariance = _paths[i]->throughputVarianceSamples.mean(); - } - } - else { - // No samples have been collected yet, assume best case scenario - _paths[i]->throughputMean = ZT_QOS_THR_NORM_MAX; - _paths[i]->throughputVariance = 0; - } - } - */ // Drain unacknowledged QoS records std::map::iterator it = _paths[i]->qosStatsOut.begin(); uint64_t currentLostRecords = 0; @@ -934,23 +918,16 @@ void Bond::estimatePathQuality(const int64_t now) quality[i]=0; totQuality=0; // Normalize raw observations according to sane limits and/or user specified values - lat[i] = 1.0 / expf(4*Utils::normalize(_paths[i]->_latencyMean, 0, _maxAcceptableLatency, 0, 1)); - pdv[i] = 1.0 / expf(4*Utils::normalize(_paths[i]->_latencyVariance, 0, _maxAcceptablePacketDelayVariance, 0, 1)); - plr[i] = 1.0 / expf(4*Utils::normalize(_paths[i]->_packetLossRatio, 0, _maxAcceptablePacketLossRatio, 0, 1)); - per[i] = 1.0 / expf(4*Utils::normalize(_paths[i]->_packetErrorRatio, 0, _maxAcceptablePacketErrorRatio, 0, 1)); - //thr[i] = 1.0; //Utils::normalize(_paths[i]->throughputMean, 0, ZT_QOS_THR_NORM_MAX, 0, 1); - //thm[i] = 1.0; //Utils::normalize(_paths[i]->throughputMax, 0, ZT_QOS_THM_NORM_MAX, 0, 1); - //thv[i] = 1.0; //1.0 / expf(4*Utils::normalize(_paths[i]->throughputVariance, 0, ZT_QOS_THV_NORM_MAX, 0, 1)); + lat[i] = 1.0 / expf(4*Utils::normalize(_paths[i]->_latencyMean, 0, _maxAcceptableLatency, 0, 1)); + pdv[i] = 1.0 / expf(4*Utils::normalize(_paths[i]->_latencyVariance, 0, _maxAcceptablePacketDelayVariance, 0, 1)); + plr[i] = 1.0 / expf(4*Utils::normalize(_paths[i]->_packetLossRatio, 0, _maxAcceptablePacketLossRatio, 0, 1)); + per[i] = 1.0 / expf(4*Utils::normalize(_paths[i]->_packetErrorRatio, 0, _maxAcceptablePacketErrorRatio, 0, 1)); //scp[i] = _paths[i]->ipvPref != 0 ? 1.0 : Utils::normalize(_paths[i]->ipScope(), InetAddress::IP_SCOPE_NONE, InetAddress::IP_SCOPE_PRIVATE, 0, 1); // Record bond-wide maximums to determine relative values maxLAT = lat[i] > maxLAT ? lat[i] : maxLAT; maxPDV = pdv[i] > maxPDV ? pdv[i] : maxPDV; maxPLR = plr[i] > maxPLR ? plr[i] : maxPLR; maxPER = per[i] > maxPER ? per[i] : maxPER; - //maxTHR = thr[i] > maxTHR ? thr[i] : maxTHR; - //maxTHM = thm[i] > maxTHM ? thm[i] : maxTHM; - //maxTHV = thv[i] > maxTHV ? thv[i] : maxTHV; - //fprintf(stdout, "EH %d: lat=%8.3f, ltm=%8.3f, pdv=%8.3f, plr=%5.3f, per=%5.3f, thr=%8f, thm=%5.3f, thv=%5.3f, avl=%5.3f, age=%8.2f, scp=%4d, q=%5.3f, qtot=%5.3f, ac=%d if=%s, path=%s\n", // i, lat[i], ltm[i], pdv[i], plr[i], per[i], thr[i], thm[i], thv[i], avl[i], age[i], scp[i], quality[i], totQuality, alloc[i], getSlave(_paths[i])->ifname().c_str(), pathStr); @@ -962,9 +939,6 @@ void Bond::estimatePathQuality(const int64_t now) quality[i] += ((maxPDV > 0.0f ? pdv[i] / maxPDV : 0.0f) * _qualityWeights[ZT_QOS_PDV_IDX]); quality[i] += ((maxPLR > 0.0f ? plr[i] / maxPLR : 0.0f) * _qualityWeights[ZT_QOS_PLR_IDX]); quality[i] += ((maxPER > 0.0f ? per[i] / maxPER : 0.0f) * _qualityWeights[ZT_QOS_PER_IDX]); - //quality[i] += ((maxTHR > 0.0f ? thr[i] / maxTHR : 0.0f) * _qualityWeights[ZT_QOS_THR_IDX]); - //quality[i] += ((maxTHM > 0.0f ? thm[i] / maxTHM : 0.0f) * _qualityWeights[ZT_QOS_THM_IDX]); - //quality[i] += ((maxTHV > 0.0f ? thv[i] / maxTHV : 0.0f) * _qualityWeights[ZT_QOS_THV_IDX]); //quality[i] += (scp[i] * _qualityWeights[ZT_QOS_SCP_IDX]); totQuality += quality[i]; } @@ -1007,6 +981,7 @@ void Bond::estimatePathQuality(const int64_t now) } _header=true; } + /* fprintf(stdout, "%ld, %d, %d, %d, ",((now - RR->bc->getBondStartTime())),_numBondedPaths,_totalBondUnderload, _flows.size()); for(unsigned int i=0;iifname().c_str(), pathStr, _paths[i]->_latencyMean, lat[i],pdv[i], _paths[i]->_packetLossRatio, plr[i],per[i],thr[i],thm[i],thv[i],(now - _paths[i]->lastIn()),quality[i],alloc[i], _paths[i]->_relativeByteLoad, _paths[i]->_assignedFlowCount, _paths[i]->alive(now, true), _paths[i]->eligible(now,_ackSendInterval), _paths[i]->qosStatsOut.size()); } - } - fprintf(stdout, "\n"); + }*/ + //fprintf(stdout, "\n"); } } void Bond::processBalanceTasks(const int64_t now) { - //fprintf(stderr, "processBalanceTasks\n"); char curPathStr[128]; + + // TODO: Generalize + int totalAllocation = 0; + for (int i=0;ibonded() && _paths[i]->eligible(now,_ackSendInterval)) { + totalAllocation+=_paths[i]->_allocation; + } + } + unsigned char minimumAllocationValue = 0.33 * ((float)totalAllocation / (float)_numBondedPaths); + if (_allowFlowHashing) { /** * Clean up and reset flows if necessary @@ -1067,6 +1054,32 @@ void Bond::processBalanceTasks(const int64_t now) } } } + /** + * Re-allocate flows from under-performing + * NOTE: This could be part of the above block but was kept separate for clarity. + */ + if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) { + Mutex::Lock _l(_flows_m); + for (int i=0;ibonded() && _paths[i]->eligible(now,_ackSendInterval) && (_paths[i]->_allocation < minimumAllocationValue) && _paths[i]->_assignedFlowCount) { + _paths[i]->address().toString(curPathStr); + fprintf(stderr, "%d reallocating flows from under-performing path %s on %s\n", (RR->node->now() - RR->bc->getBondStartTime()), curPathStr, getSlave(_paths[i])->ifname().c_str()); + std::map >::iterator flow_it = _flows.begin(); + while (flow_it != _flows.end()) { + if (flow_it->second->assignedPath() == _paths[i]) { + if(assignFlowToBondedPath(flow_it->second, now)) { + _paths[i]->_assignedFlowCount--; + } + } + ++flow_it; + } + _paths[i]->_shouldReallocateFlows = false; + } + } + } } /** * Tasks specific to (Balance Round Robin) @@ -1091,70 +1104,47 @@ void Bond::processBalanceTasks(const int64_t now) if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) { if (_allowFlowHashing) { Mutex::Lock _l(_flows_m); - /** - * Re-balance flows in proportion to slave capacity (or when eligibility changes) - */ - if ((now - _lastFlowRebalance) > ZT_FLOW_REBALANCE_INTERVAL) { + if (_flowRebalanceStrategy == ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_PASSIVE) { + // Do nothing here, this is taken care of in the more general case above. + } + if (_flowRebalanceStrategy == ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_OPPORTUNISTIC) { + // If the flow is temporarily inactive we should take this opportunity to re-assign the flow if needed. + } + if (_flowRebalanceStrategy == ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_AGGRESSIVE) { /** - * Determine "load" for bonded paths + * Return flows to the original path if it has once again become available */ - uint64_t totalBytes = 0; - for(unsigned int i=0;ibonded()) { - _paths[i]->_byteLoad = 0; - std::map >::iterator flow_it = _flows.begin(); - while (flow_it != _flows.end()) { - if (flow_it->second->assignedPath() == _paths[i]) { - _paths[i]->_byteLoad += flow_it->second->totalBytes(); - } - ++flow_it; + if ((now - _lastFlowRebalance) > ZT_FLOW_REBALANCE_INTERVAL) { + std::map >::iterator flow_it = _flows.begin(); + while (flow_it != _flows.end()) { + if (flow_it->second->_previouslyAssignedPath && flow_it->second->_previouslyAssignedPath->eligible(now, _ackSendInterval) + && (flow_it->second->_previouslyAssignedPath->_allocation >= (minimumAllocationValue * 2))) { + fprintf(stderr, "moving flow back onto its previous path assignment (based on eligibility)\n"); + (flow_it->second->_assignedPath->_assignedFlowCount)--; + flow_it->second->assignPath(flow_it->second->_previouslyAssignedPath,now); + (flow_it->second->_previouslyAssignedPath->_assignedFlowCount)++; } - totalBytes += _paths[i]->_byteLoad; + ++flow_it; } + _lastFlowRebalance = now; } /** - * Determine "affinity" for bonded path + * Return flows to the original path if it has once again become (performant) */ - //fprintf(stderr, "\n\n"); - - _totalBondUnderload = 0; -/* - for(unsigned int i=0;ibonded()) { - if (totalBytes) { - uint8_t relativeByteLoad = std::ceil(((float)_paths[i]->_byteLoad / (float)totalBytes) * (float)255); - //fprintf(stderr, "lastComputedAllocation = %d\n", _paths[i]->allocation); - //fprintf(stderr, " relativeByteLoad = %d\n", relativeByteLoad); - _paths[i]->_relativeByteLoad = relativeByteLoad; - uint8_t relativeUnderload = std::max(0, (int)_paths[i]->_allocation - (int)relativeByteLoad); - //fprintf(stderr, " relativeUnderload = %d\n", relativeUnderload); - _totalBondUnderload += relativeUnderload; - //fprintf(stderr, " _totalBondUnderload = %d\n\n", _totalBondUnderload); - //_paths[i]->affinity = (relativeUnderload > 0 ? relativeUnderload : _paths[i]->_allocation); - } - else { // set everything to base values - _totalBondUnderload = 0; - //_paths[i]->affinity = 0; + if ((now - _lastFlowRebalance) > ZT_FLOW_REBALANCE_INTERVAL) { + std::map >::iterator flow_it = _flows.begin(); + while (flow_it != _flows.end()) { + if (flow_it->second->_previouslyAssignedPath && flow_it->second->_previouslyAssignedPath->eligible(now, _ackSendInterval) + && (flow_it->second->_previouslyAssignedPath->_allocation >= (minimumAllocationValue * 2))) { + fprintf(stderr, "moving flow back onto its previous path assignment (based on performance)\n"); + (flow_it->second->_assignedPath->_assignedFlowCount)--; + flow_it->second->assignPath(flow_it->second->_previouslyAssignedPath,now); + (flow_it->second->_previouslyAssignedPath->_assignedFlowCount)++; } + ++flow_it; } + _lastFlowRebalance = now; } -*/ - //fprintf(stderr, "_totalBondUnderload=%d (end)\n\n", _totalBondUnderload); - - /** - * - */ - //fprintf(stderr, "_lastFlowRebalance\n"); - std::map >::iterator it = _flows.begin(); - while (it != _flows.end()) { - int32_t flowId = it->first; - SharedPtr flow = it->second; - if ((now - flow->_lastPathReassignment) > ZT_FLOW_MIN_REBALANCE_INTERVAL) { - //fprintf(stdout, " could move : %x\n", flowId); - } - ++it; - } - _lastFlowRebalance = now; } } else if (!_allowFlowHashing) { @@ -1440,7 +1430,7 @@ void Bond::processActiveBackupTasks(const int64_t now) if (!_abFailoverQueue.empty()) { fprintf(stderr, "%llu AB: (failure) there are (%lu) slaves in queue to choose from...\n", ((now - RR->bc->getBondStartTime())), _abFailoverQueue.size()); dequeueNextActiveBackupPath(now); - _abPath->address().toString(curPathStr); fprintf(stderr, "%llu sAB: (failure) switched to %s on %s\n", ((now - RR->bc->getBondStartTime())), curPathStr, getSlave(_abPath)->ifname().c_str()); + _abPath->address().toString(curPathStr); fprintf(stderr, "%llu AB: (failure) switched to %s on %s\n", ((now - RR->bc->getBondStartTime())), curPathStr, getSlave(_abPath)->ifname().c_str()); } else { fprintf(stderr, "%llu AB: (failure) nothing available in the slave queue, doing nothing.\n", ((now - RR->bc->getBondStartTime()))); } @@ -1515,12 +1505,16 @@ void Bond::setReasonableDefaults(int policy, SharedPtr templateBond, bool _bondingPolicy= policy; } + _freeRandomByte = 0; + _lastCheckUserPreferences = 0; + _lastBackgroundTaskCheck = 0; + _downDelay = 0; _upDelay = 0; _allowFlowHashing=false; _bondMonitorInterval=0; _shouldCollectPathStatistics=false; - _lastBackgroundTaskCheck=0; + // Path negotiation _allowPathNegotiation=false; @@ -1539,7 +1533,7 @@ void Bond::setReasonableDefaults(int policy, SharedPtr templateBond, bool _lastFlowRebalance=0; _totalBondUnderload = 0; - //_maxAcceptableLatency + _maxAcceptableLatency = 100; _maxAcceptablePacketDelayVariance = 50; _maxAcceptablePacketLossRatio = 0.10; _maxAcceptablePacketErrorRatio = 0.10; @@ -1547,17 +1541,18 @@ void Bond::setReasonableDefaults(int policy, SharedPtr templateBond, bool _lastFrame=0; - // TODO: Remove - _header=false; - _lastLogTS = RR->node->now(); - _lastPrintTS = RR->node->now(); + + + /* ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_PASSIVE is the most conservative strategy and is + least likely to cause unexpected behavior */ + _flowRebalanceStrategy = ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_AGGRESSIVE; /** * Paths are actively monitored to provide a real-time quality/preference-ordered rapid failover queue. */ switch (policy) { case ZT_BONDING_POLICY_ACTIVE_BACKUP: - _failoverInterval = 5000; + _failoverInterval = 500; _abSlaveSelectMethod = ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE; _slaveMonitorStrategy = ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC; _qualityWeights[ZT_QOS_LAT_IDX] = 0.2f; @@ -1581,7 +1576,7 @@ void Bond::setReasonableDefaults(int policy, SharedPtr templateBond, bool * Paths are monitored to determine when/if one needs to be added or removed from the rotation */ case ZT_BONDING_POLICY_BALANCE_RR: - _failoverInterval = 5000; + _failoverInterval = 500; _allowFlowHashing = false; _packetsPerSlave = 1024; _slaveMonitorStrategy = ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC; @@ -1600,8 +1595,8 @@ void Bond::setReasonableDefaults(int policy, SharedPtr templateBond, bool * path and where to place the next flow. */ case ZT_BONDING_POLICY_BALANCE_XOR: - _failoverInterval = 5000;; - _upDelay=_bondMonitorInterval*2; + _failoverInterval = 500; + _upDelay = _bondMonitorInterval * 2; _allowFlowHashing = true; _slaveMonitorStrategy = ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC; _qualityWeights[ZT_QOS_LAT_IDX] = 0.4f; @@ -1623,13 +1618,13 @@ void Bond::setReasonableDefaults(int policy, SharedPtr templateBond, bool _failoverInterval = 3000; _allowFlowHashing = true; _slaveMonitorStrategy = ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC; - _qualityWeights[ZT_QOS_LAT_IDX] = 0.3f; + _qualityWeights[ZT_QOS_LAT_IDX] = 0.4f; _qualityWeights[ZT_QOS_LTM_IDX] = 0.0f; - _qualityWeights[ZT_QOS_PDV_IDX] = 0.1f; - _qualityWeights[ZT_QOS_PLR_IDX] = 0.1f; - _qualityWeights[ZT_QOS_PER_IDX] = 0.1f; + _qualityWeights[ZT_QOS_PDV_IDX] = 0.4f; + _qualityWeights[ZT_QOS_PLR_IDX] = 0.2f; + _qualityWeights[ZT_QOS_PER_IDX] = 0.0f; _qualityWeights[ZT_QOS_THR_IDX] = 0.0f; - _qualityWeights[ZT_QOS_THM_IDX] = 0.4f; + _qualityWeights[ZT_QOS_THM_IDX] = 0.0f; _qualityWeights[ZT_QOS_THV_IDX] = 0.0f; _qualityWeights[ZT_QOS_SCP_IDX] = 0.0f; break; @@ -1637,6 +1632,8 @@ void Bond::setReasonableDefaults(int policy, SharedPtr templateBond, bool break; } + /* If a user has specified custom parameters for this bonding policy, overlay + them onto the defaults that were previously set */ if (useTemplate) { _policyAlias = templateBond->_policyAlias; _failoverInterval = templateBond->_failoverInterval; @@ -1742,7 +1739,7 @@ void Bond::dumpInfo(const int64_t now) fprintf(stderr, "Paths (bp=%d, stats=%d, fh=%d) :\n", _policy, _shouldCollectPathStatistics, _allowFlowHashing); }*/ - if ((now - _lastPrintTS) < 1000) { + if ((now - _lastPrintTS) < 2000) { return; } _lastPrintTS = now; @@ -1856,30 +1853,7 @@ void Bond::dumpInfo(const int64_t now) currPathStr); } } - /* - if (_allowFlowHashing) { - //Mutex::Lock _l(_flows_m); - if (_flows.size()) { - fprintf(stderr, "\nFlows:\n"); - std::map >::iterator it = _flows.begin(); - while (it != _flows.end()) { - it->second->assignedPath()->address().toString(currPathStr); - SharedPtr slave =RR->bc->getSlaveBySocket(_policyAlias, it->second->assignedPath()->localSocket()); - fprintf(stderr, " [%4x] in=%16llu, out=%16llu, bytes=%16llu, last=%16llu, if=%8s\t\t%s\n", - it->second->id(), - it->second->bytesInPerUnitTime(), - it->second->bytesOutPerUnitTime(), - it->second->totalBytes(), - it->second->age(now), - slave->ifname().c_str(), - currPathStr); - ++it; - } - } - } - */ } - //fprintf(stderr, "\n\n\n\n\n"); } } // namespace ZeroTier \ No newline at end of file diff --git a/node/Bond.hpp b/node/Bond.hpp index e60e27a19..353ed9317 100644 --- a/node/Bond.hpp +++ b/node/Bond.hpp @@ -87,7 +87,7 @@ public: std::string policyAlias() { return _policyAlias; } /** - * Inform the bond about the path that its peer just learned about + * Inform the bond about the path that its peer (owning object) just learned about * * @param path Newly-learned Path which should now be handled by the Bond * @param now Current time @@ -434,7 +434,12 @@ public: inline void setFailoverInterval(uint32_t interval) { _failoverInterval = interval; } /** - * @param strategy The strategy that the bond uses to prob for path aliveness and quality + * @param strategy Strategy that the bond uses to re-assign protocol flows. + */ + inline void setFlowRebalanceStrategy(uint32_t strategy) { _flowRebalanceStrategy = strategy; } + + /** + * @param strategy Strategy that the bond uses to prob for path aliveness and quality */ inline void setSlaveMonitorStrategy(uint8_t strategy) { _slaveMonitorStrategy = strategy; } @@ -578,6 +583,7 @@ private: // balance-aware uint64_t _totalBondUnderload; + uint8_t _flowRebalanceStrategy; // dynamic slave monitoring uint8_t _slaveMonitorStrategy; diff --git a/node/BondController.cpp b/node/BondController.cpp index 06da41759..6b21d9998 100644 --- a/node/BondController.cpp +++ b/node/BondController.cpp @@ -11,6 +11,7 @@ */ /****/ +#include "Constants.hpp" #include "BondController.hpp" #include "Peer.hpp" @@ -23,6 +24,7 @@ BondController::BondController(const RuntimeEnvironment *renv) : RR(renv) { bondStartTime = RR->node->now(); + _defaultBondingPolicy = ZT_BONDING_POLICY_NONE; } bool BondController::slaveAllowed(std::string &policyAlias, SharedPtr slave) @@ -83,10 +85,9 @@ SharedPtr BondController::createTransportTriggeredBond(const RuntimeEnviro Bond *bond = nullptr; if (!_bonds.count(identity)) { std::string policyAlias; - int _defaultBondingPolicy = defaultBondingPolicy(); fprintf(stderr, "new bond, registering for %llx\n", identity); if (!_policyTemplateAssignments.count(identity)) { - if (defaultBondingPolicy()) { + if (_defaultBondingPolicy) { fprintf(stderr, " no assignment, using default (%d)\n", _defaultBondingPolicy); bond = new Bond(renv, _defaultBondingPolicy, peer); } diff --git a/node/Constants.hpp b/node/Constants.hpp index c27e02319..9f2cd80a5 100644 --- a/node/Constants.hpp +++ b/node/Constants.hpp @@ -341,11 +341,6 @@ */ #define ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE 32 -/** - * Number of samples to consider when processing long-term trends - */ -#define ZT_QOS_LONGTERM_SAMPLE_WIN_SIZE (ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE * 4) - /** * Max allowable time spent in any queue (in ms) */ diff --git a/node/Flow.hpp b/node/Flow.hpp index 5994a4fb2..b19fd475c 100644 --- a/node/Flow.hpp +++ b/node/Flow.hpp @@ -116,6 +116,7 @@ struct Flow int64_t _lastActivity; int64_t _lastPathReassignment; SharedPtr _assignedPath; + SharedPtr _previouslyAssignedPath; }; } // namespace ZeroTier diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp index 702c08090..43e36f3ce 100644 --- a/node/IncomingPacket.cpp +++ b/node/IncomingPacket.cpp @@ -221,9 +221,11 @@ bool IncomingPacket::_doACK(const RuntimeEnvironment *RR,void *tPtr,const Shared bool IncomingPacket::_doQOS_MEASUREMENT(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr &peer) { SharedPtr bond = peer->bond(); + /* TODO: Fix rate gate issue if (!bond || !bond->rateGateQoS(RR->node->now())) { return true; } + */ /* Dissect incoming QoS packet. From this we can compute latency values and their variance. * The latency variance is used as a measure of "jitter". */ if (payloadLength() > ZT_QOS_MAX_PACKET_SIZE || payloadLength() < ZT_QOS_MIN_PACKET_SIZE) { diff --git a/node/Peer.cpp b/node/Peer.cpp index 1ee0c1240..30911b43c 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -55,7 +55,8 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident _remoteMultipathSupported(false), _canUseMultipath(false), _shouldCollectPathStatistics(0), - _lastComputedAggregateMeanLatency(0) + _lastComputedAggregateMeanLatency(0), + _bondingPolicy(0) { if (!myIdentity.agree(peerIdentity,_key,ZT_PEER_SECRET_KEY_LENGTH)) { throw ZT_EXCEPTION_INVALID_ARGUMENT; diff --git a/service/OneService.cpp b/service/OneService.cpp index ab8594eec..ec24f7ade 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -1621,6 +1621,7 @@ public: // Bond-specific properties newTemplateBond->setUpDelay(OSUtils::jsonInt(customPolicy["upDelay"],-1)); newTemplateBond->setDownDelay(OSUtils::jsonInt(customPolicy["downDelay"],-1)); + newTemplateBond->setFlowRebalanceStrategy(OSUtils::jsonInt(customPolicy["flowRebalanceStrategy"],(uint64_t)0)); newTemplateBond->setFailoverInterval(OSUtils::jsonInt(customPolicy["failoverInterval"],(uint64_t)0)); newTemplateBond->setPacketsPerSlave(OSUtils::jsonInt(customPolicy["packetsPerSlave"],-1)); std::string slaveMonitorStrategyStr(OSUtils::jsonString(customPolicy["slaveMonitorStrategy"],""));