Merge pull request #2238 from zerotier/jh-multipath-improvements

Port multipath improvements to newer version
This commit is contained in:
Joseph Henry 2024-03-05 10:59:05 -08:00 committed by GitHub
commit 08d85d4ae1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 149 additions and 55 deletions

View File

@ -1347,8 +1347,16 @@ typedef struct
*/ */
char ifname[ZT_MAX_PHYSIFNAME]; char ifname[ZT_MAX_PHYSIFNAME];
/**
* Pointer to PhySocket object for this path
*/
uint64_t localSocket; uint64_t localSocket;
/**
* Local port corresponding to this path's localSocket
*/
uint16_t localPort;
/** /**
* Is path expired? * Is path expired?
*/ */

View File

@ -15,10 +15,10 @@
#include "Switch.hpp" #include "Switch.hpp"
#include <cinttypes> // for PRId64, etc. macros
#include <cmath> #include <cmath>
#include <cstdio> #include <cstdio>
#include <string> #include <string>
#include <cinttypes> // for PRId64, etc. macros
// FIXME: remove this suppression and actually fix warnings // FIXME: remove this suppression and actually fix warnings
#ifdef __GNUC__ #ifdef __GNUC__
@ -108,7 +108,7 @@ bool Bond::setAllMtuByTuple(uint16_t mtu, const std::string& ifStr, const std::s
std::map<int64_t, SharedPtr<Bond> >::iterator bondItr = _bonds.begin(); std::map<int64_t, SharedPtr<Bond> >::iterator bondItr = _bonds.begin();
bool found = false; bool found = false;
while (bondItr != _bonds.end()) { while (bondItr != _bonds.end()) {
if (bondItr->second->setMtuByTuple(mtu,ifStr,ipStr)) { if (bondItr->second->setMtuByTuple(mtu, ifStr, ipStr)) {
found = true; found = true;
} }
++bondItr; ++bondItr;
@ -154,11 +154,13 @@ SharedPtr<Bond> Bond::createBond(const RuntimeEnvironment* renv, const SharedPtr
bond = new Bond(renv, _bondPolicyTemplates[_defaultPolicyStr].ptr(), peer); bond = new Bond(renv, _bondPolicyTemplates[_defaultPolicyStr].ptr(), peer);
bond->debug("new default custom bond (based on %s)", bond->getPolicyStrByCode(bond->policy()).c_str()); bond->debug("new default custom bond (based on %s)", bond->getPolicyStrByCode(bond->policy()).c_str());
} }
} else { }
else {
if (! _bondPolicyTemplates[_policyTemplateAssignments[identity]]) { if (! _bondPolicyTemplates[_policyTemplateAssignments[identity]]) {
bond = new Bond(renv, _defaultPolicy, peer); bond = new Bond(renv, _defaultPolicy, peer);
bond->debug("peer-specific bond, was specified as %s but the bond definition was not found, using default %s", _policyTemplateAssignments[identity].c_str(), getPolicyStrByCode(_defaultPolicy).c_str()); bond->debug("peer-specific bond, was specified as %s but the bond definition was not found, using default %s", _policyTemplateAssignments[identity].c_str(), getPolicyStrByCode(_defaultPolicy).c_str());
} else { }
else {
bond = new Bond(renv, _bondPolicyTemplates[_policyTemplateAssignments[identity]].ptr(), peer); bond = new Bond(renv, _bondPolicyTemplates[_policyTemplateAssignments[identity]].ptr(), peer);
bond->debug("new default bond"); bond->debug("new default bond");
} }
@ -227,10 +229,12 @@ SharedPtr<Link> Bond::getLinkBySocket(const std::string& policyAlias, uint64_t l
SharedPtr<Link> s = new Link(ifnameStr, 0, 0, 0, true, ZT_BOND_SLAVE_MODE_PRIMARY, ""); SharedPtr<Link> s = new Link(ifnameStr, 0, 0, 0, true, ZT_BOND_SLAVE_MODE_PRIMARY, "");
_interfaceToLinkMap[policyAlias].insert(std::pair<std::string, SharedPtr<Link> >(ifnameStr, s)); _interfaceToLinkMap[policyAlias].insert(std::pair<std::string, SharedPtr<Link> >(ifnameStr, s));
return s; return s;
} else { }
else {
return SharedPtr<Link>(); return SharedPtr<Link>();
} }
} else { }
else {
return search->second; return search->second;
} }
} }
@ -340,6 +344,7 @@ void Bond::nominatePathToBond(const SharedPtr<Path>& path, int64_t now)
_paths[i].ipvPref = sl->ipvPref(); _paths[i].ipvPref = sl->ipvPref();
_paths[i].mode = sl->mode(); _paths[i].mode = sl->mode();
_paths[i].enabled = sl->enabled(); _paths[i].enabled = sl->enabled();
_paths[i].localPort = _phy->getLocalPort((PhySocket*)((uintptr_t)path->localSocket()));
_paths[i].onlyPathOnLink = ! bFoundCommonLink; _paths[i].onlyPathOnLink = ! bFoundCommonLink;
} }
} }
@ -397,7 +402,8 @@ SharedPtr<Path> Bond::getAppropriatePath(int64_t now, int32_t flowId)
_rrPacketsSentOnCurrLink = 0; _rrPacketsSentOnCurrLink = 0;
if (_numBondedPaths == 1 || _rrIdx >= (ZT_MAX_PEER_NETWORK_PATHS - 1)) { if (_numBondedPaths == 1 || _rrIdx >= (ZT_MAX_PEER_NETWORK_PATHS - 1)) {
_rrIdx = 0; _rrIdx = 0;
} else { }
else {
int _tempIdx = _rrIdx; int _tempIdx = _rrIdx;
for (int searchCount = 0; searchCount < (_numBondedPaths - 1); searchCount++) { for (int searchCount = 0; searchCount < (_numBondedPaths - 1); searchCount++) {
_tempIdx = (_tempIdx == (_numBondedPaths - 1)) ? 0 : _tempIdx + 1; _tempIdx = (_tempIdx == (_numBondedPaths - 1)) ? 0 : _tempIdx + 1;
@ -427,7 +433,8 @@ SharedPtr<Path> Bond::getAppropriatePath(int64_t now, int32_t flowId)
if (likely(it != _flows.end())) { if (likely(it != _flows.end())) {
it->second->lastActivity = now; it->second->lastActivity = now;
return _paths[it->second->assignedPath].p; return _paths[it->second->assignedPath].p;
} else { }
else {
unsigned char entropy; unsigned char entropy;
Utils::getSecureRandom(&entropy, 1); Utils::getSecureRandom(&entropy, 1);
SharedPtr<Flow> flow = createFlow(ZT_MAX_PEER_NETWORK_PATHS, flowId, entropy, now); SharedPtr<Flow> flow = createFlow(ZT_MAX_PEER_NETWORK_PATHS, flowId, entropy, now);
@ -505,7 +512,8 @@ void Bond::recordIncomingPacket(const SharedPtr<Path>& path, uint64_t packetId,
_paths[pathIdx].qosStatsIn[packetId] = now; _paths[pathIdx].qosStatsIn[packetId] = now;
++(_paths[pathIdx].packetsReceivedSinceLastQoS); ++(_paths[pathIdx].packetsReceivedSinceLastQoS);
//_paths[pathIdx].packetValiditySamples.push(true); //_paths[pathIdx].packetValiditySamples.push(true);
} else { }
else {
// debug("QoS buffer full, will not record information"); // debug("QoS buffer full, will not record information");
} }
/* /*
@ -532,7 +540,8 @@ void Bond::recordIncomingPacket(const SharedPtr<Path>& path, uint64_t packetId,
SharedPtr<Flow> flow; SharedPtr<Flow> flow;
if (! _flows.count(flowId)) { if (! _flows.count(flowId)) {
flow = createFlow(pathIdx, flowId, 0, now); flow = createFlow(pathIdx, flowId, 0, now);
} else { }
else {
flow = _flows[flowId]; flow = _flows[flowId];
} }
if (flow) { if (flow) {
@ -618,7 +627,8 @@ bool Bond::assignFlowToBondedPath(SharedPtr<Flow>& flow, int64_t now, bool reass
if (reassign) { if (reassign) {
log("attempting to re-assign out-flow %04x previously on idx %d (%u / %zu flows)", flow->id, flow->assignedPath, _paths[_realIdxMap[flow->assignedPath]].assignedFlowCount, _flows.size()); log("attempting to re-assign out-flow %04x previously on idx %d (%u / %zu flows)", flow->id, flow->assignedPath, _paths[_realIdxMap[flow->assignedPath]].assignedFlowCount, _flows.size());
} else { }
else {
debug("attempting to assign flow for the first time"); debug("attempting to assign flow for the first time");
} }
@ -632,7 +642,8 @@ bool Bond::assignFlowToBondedPath(SharedPtr<Flow>& flow, int64_t now, bool reass
if (reassign) { if (reassign) {
bondedIdx = (flow->assignedPath + offset) % (_numBondedPaths); bondedIdx = (flow->assignedPath + offset) % (_numBondedPaths);
} else { }
else {
bondedIdx = abs((int)((entropy + offset) % (_numBondedPaths))); bondedIdx = abs((int)((entropy + offset) % (_numBondedPaths)));
} }
// debug("idx=%d, offset=%d, randomCap=%f, actualCap=%f", bondedIdx, offset, randomLinkCapacity, _paths[_realIdxMap[bondedIdx]].relativeLinkCapacity); // debug("idx=%d, offset=%d, randomCap=%f, actualCap=%f", bondedIdx, offset, randomLinkCapacity, _paths[_realIdxMap[bondedIdx]].relativeLinkCapacity);
@ -655,7 +666,8 @@ bool Bond::assignFlowToBondedPath(SharedPtr<Flow>& flow, int64_t now, bool reass
flow->assignPath(_realIdxMap[bondedIdx], now); flow->assignPath(_realIdxMap[bondedIdx], now);
++(_paths[_realIdxMap[bondedIdx]].assignedFlowCount); ++(_paths[_realIdxMap[bondedIdx]].assignedFlowCount);
// debug(" ABLE to find optimal link %f idx %d", _paths[_realIdxMap[bondedIdx]].relativeQuality, bondedIdx); // debug(" ABLE to find optimal link %f idx %d", _paths[_realIdxMap[bondedIdx]].relativeQuality, bondedIdx);
} else { }
else {
// We were (unable) to find a path that didn't violate at least one quality requirement, will choose next best option // We were (unable) to find a path that didn't violate at least one quality requirement, will choose next best option
flow->assignPath(_realIdxMap[nextBestQualIdx], now); flow->assignPath(_realIdxMap[nextBestQualIdx], now);
++(_paths[_realIdxMap[nextBestQualIdx]].assignedFlowCount); ++(_paths[_realIdxMap[nextBestQualIdx]].assignedFlowCount);
@ -715,11 +727,13 @@ void Bond::forgetFlowsWhenNecessary(uint64_t age, bool oldest, int64_t now)
debug("forget flow %04x (age %" PRId64 ") (%u / %zu)", it->first, it->second->age(now), _paths[it->second->assignedPath].assignedFlowCount, (_flows.size() - 1)); debug("forget flow %04x (age %" PRId64 ") (%u / %zu)", it->first, it->second->age(now), _paths[it->second->assignedPath].assignedFlowCount, (_flows.size() - 1));
_paths[it->second->assignedPath].assignedFlowCount--; _paths[it->second->assignedPath].assignedFlowCount--;
it = _flows.erase(it); it = _flows.erase(it);
} else { }
else {
++it; ++it;
} }
} }
} else if (oldest) { // Remove single oldest by natural expiration }
else if (oldest) { // Remove single oldest by natural expiration
uint64_t maxAge = 0; uint64_t maxAge = 0;
while (it != _flows.end()) { while (it != _flows.end()) {
if (it->second->age(now) > maxAge) { if (it->second->age(now) > maxAge) {
@ -766,7 +780,8 @@ void Bond::processIncomingPathNegotiationRequest(uint64_t now, SharedPtr<Path>&
if (_peer->_id.address().toInt() > RR->node->identity().address().toInt()) { if (_peer->_id.address().toInt() > RR->node->identity().address().toInt()) {
debug("agree with peer to use alternate link %s/%s\n", link->ifname().c_str(), pathStr); debug("agree with peer to use alternate link %s/%s\n", link->ifname().c_str(), pathStr);
_negotiatedPathIdx = pathIdx; _negotiatedPathIdx = pathIdx;
} else { }
else {
debug("ignore petition from peer to use alternate link %s/%s\n", link->ifname().c_str(), pathStr); debug("ignore petition from peer to use alternate link %s/%s\n", link->ifname().c_str(), pathStr);
} }
} }
@ -881,7 +896,8 @@ void Bond::sendQOS_MEASUREMENT(void* tPtr, int pathIdx, int64_t localSocket, con
if (atAddress) { if (atAddress) {
outp.armor(_peer->key(), false, _peer->aesKeysIfSupported()); outp.armor(_peer->key(), false, _peer->aesKeysIfSupported());
RR->node->putPacket(tPtr, localSocket, atAddress, outp.data(), outp.size()); RR->node->putPacket(tPtr, localSocket, atAddress, outp.data(), outp.size());
} else { }
else {
RR->sw->send(tPtr, outp, false); RR->sw->send(tPtr, outp, false);
} }
Metrics::pkt_qos_out++; Metrics::pkt_qos_out++;
@ -1222,7 +1238,8 @@ void Bond::estimatePathQuality(int64_t now)
if ((now - it->second) >= qosRecordTimeout) { if ((now - it->second) >= qosRecordTimeout) {
it = _paths[i].qosStatsOut.erase(it); it = _paths[i].qosStatsOut.erase(it);
++numDroppedQosOutRecords; ++numDroppedQosOutRecords;
} else { }
else {
++it; ++it;
} }
} }
@ -1250,7 +1267,8 @@ void Bond::estimatePathQuality(int64_t now)
if ((now - it->second) >= qosRecordTimeout) { if ((now - it->second) >= qosRecordTimeout) {
it = _paths[i].qosStatsIn.erase(it); it = _paths[i].qosStatsIn.erase(it);
++numDroppedQosInRecords; ++numDroppedQosInRecords;
} else { }
else {
++it; ++it;
} }
} }
@ -1327,10 +1345,10 @@ void Bond::estimatePathQuality(int64_t now)
continue; continue;
} }
// Compute/Smooth average of real-world observations // Compute/Smooth average of real-world observations
if (_paths[i].latencySamples.count() == ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE) { if (_paths[i].latencySamples.count() >= ZT_QOS_SHORTTERM_SAMPLE_WIN_MIN_REQ_SIZE) {
_paths[i].latency = _paths[i].latencySamples.mean(); _paths[i].latency = _paths[i].latencySamples.mean();
} }
if (_paths[i].latencySamples.count() == ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE) { if (_paths[i].latencySamples.count() >= ZT_QOS_SHORTTERM_SAMPLE_WIN_MIN_REQ_SIZE) {
_paths[i].latencyVariance = _paths[i].latencySamples.stddev(); _paths[i].latencyVariance = _paths[i].latencySamples.stddev();
} }
@ -1344,6 +1362,7 @@ void Bond::estimatePathQuality(int64_t now)
//_paths[i].packetErrorRatio = 1.0 - (_paths[i].packetValiditySamples.count() ? _paths[i].packetValiditySamples.mean() : 1.0); //_paths[i].packetErrorRatio = 1.0 - (_paths[i].packetValiditySamples.count() ? _paths[i].packetValiditySamples.mean() : 1.0);
// _valid is written elsewhere // _valid is written elsewhere
_paths[i].p->_relativeQuality = _paths[i].relativeQuality; _paths[i].p->_relativeQuality = _paths[i].relativeQuality;
_paths[i].p->_localPort = _paths[i].localPort;
} }
// Flag links for avoidance // Flag links for avoidance
@ -1370,7 +1389,8 @@ void Bond::estimatePathQuality(int64_t now)
shouldAvoid = true; shouldAvoid = true;
} }
_paths[i].shouldAvoid = shouldAvoid; _paths[i].shouldAvoid = shouldAvoid;
} else { }
else {
if (! shouldAvoid) { if (! shouldAvoid) {
log("no longer avoiding link %s", pathToStr(_paths[i].p).c_str()); log("no longer avoiding link %s", pathToStr(_paths[i].p).c_str());
_paths[i].shouldAvoid = false; _paths[i].shouldAvoid = false;
@ -1482,7 +1502,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
_lastBondStatusLog = now; _lastBondStatusLog = now;
if (_abPathIdx == ZT_MAX_PEER_NETWORK_PATHS) { if (_abPathIdx == ZT_MAX_PEER_NETWORK_PATHS) {
log("no active link"); log("no active link");
} else if (_paths[_abPathIdx].p) { }
else if (_paths[_abPathIdx].p) {
log("active link is %s, failover queue size is %zu", pathToStr(_paths[_abPathIdx].p).c_str(), _abFailoverQueue.size()); log("active link is %s, failover queue size is %zu", pathToStr(_paths[_abPathIdx].p).c_str(), _abFailoverQueue.size());
} }
if (_abFailoverQueue.empty()) { if (_abFailoverQueue.empty()) {
@ -1590,7 +1611,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
log("link %s is ineligible, removing from failover queue (%zu links remain in queue)", pathToStr(_paths[_abPathIdx].p).c_str(), _abFailoverQueue.size()); log("link %s is ineligible, removing from failover queue (%zu links remain in queue)", pathToStr(_paths[_abPathIdx].p).c_str(), _abFailoverQueue.size());
} }
continue; continue;
} else { }
else {
++it; ++it;
} }
} }
@ -1739,7 +1761,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
if (! _abFailoverQueue.empty()) { if (! _abFailoverQueue.empty()) {
dequeueNextActiveBackupPath(now); dequeueNextActiveBackupPath(now);
log("active link switched to %s", pathToStr(_paths[_abPathIdx].p).c_str()); log("active link switched to %s", pathToStr(_paths[_abPathIdx].p).c_str());
} else { }
else {
log("failover queue is empty, no links to choose from"); log("failover queue is empty, no links to choose from");
} }
} }
@ -1785,7 +1808,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
dequeueNextActiveBackupPath(now); dequeueNextActiveBackupPath(now);
_lastPathNegotiationCheck = now; _lastPathNegotiationCheck = now;
log("switch negotiated link %s (select mode: optimize)", pathToStr(_paths[_abPathIdx].p).c_str()); log("switch negotiated link %s (select mode: optimize)", pathToStr(_paths[_abPathIdx].p).c_str());
} else { }
else {
// Try to find a better path and automatically switch to it -- not too often, though. // Try to find a better path and automatically switch to it -- not too often, though.
if ((now - _lastActiveBackupPathChange) > ZT_BOND_OPTIMIZE_INTERVAL) { if ((now - _lastActiveBackupPathChange) > ZT_BOND_OPTIMIZE_INTERVAL) {
if (! _abFailoverQueue.empty()) { if (! _abFailoverQueue.empty()) {
@ -1901,7 +1925,7 @@ void Bond::setBondParameters(int policy, SharedPtr<Bond> templateBond, bool useT
} }
if (! _isLeaf) { if (! _isLeaf) {
_policy = ZT_BOND_POLICY_ACTIVE_BACKUP; _policy = ZT_BOND_POLICY_NONE;
} }
// Timer geometry // Timer geometry

View File

@ -315,7 +315,6 @@ class Peer;
class Bond { class Bond {
public: public:
/** /**
* Stop bond's internal functions (can be resumed) * Stop bond's internal functions (can be resumed)
*/ */
@ -909,7 +908,8 @@ class Bond {
_lastAckRateCheck = now; _lastAckRateCheck = now;
if (_ackCutoffCount > numToDrain) { if (_ackCutoffCount > numToDrain) {
_ackCutoffCount -= numToDrain; _ackCutoffCount -= numToDrain;
} else { }
else {
_ackCutoffCount = 0; _ackCutoffCount = 0;
} }
return (_ackCutoffCount < ZT_ACK_CUTOFF_LIMIT); return (_ackCutoffCount < ZT_ACK_CUTOFF_LIMIT);
@ -928,7 +928,8 @@ class Bond {
uint64_t diff = now - _lastQoSRateCheck; uint64_t diff = now - _lastQoSRateCheck;
if ((diff) <= (_qosSendInterval / ZT_MAX_PEER_NETWORK_PATHS)) { if ((diff) <= (_qosSendInterval / ZT_MAX_PEER_NETWORK_PATHS)) {
++_qosCutoffCount; ++_qosCutoffCount;
} else { }
else {
_qosCutoffCount = 0; _qosCutoffCount = 0;
} }
_lastQoSRateCheck = now; _lastQoSRateCheck = now;
@ -948,7 +949,8 @@ class Bond {
int diff = now - _lastPathNegotiationReceived; int diff = now - _lastPathNegotiationReceived;
if ((diff) <= (ZT_PATH_NEGOTIATION_CUTOFF_TIME / ZT_MAX_PEER_NETWORK_PATHS)) { if ((diff) <= (ZT_PATH_NEGOTIATION_CUTOFF_TIME / ZT_MAX_PEER_NETWORK_PATHS)) {
++_pathNegotiationCutoffCount; ++_pathNegotiationCutoffCount;
} else { }
else {
_pathNegotiationCutoffCount = 0; _pathNegotiationCutoffCount = 0;
} }
_lastPathNegotiationReceived = now; _lastPathNegotiationReceived = now;
@ -1230,6 +1232,7 @@ class Bond {
, packetsReceivedSinceLastQoS(0) , packetsReceivedSinceLastQoS(0)
, packetsIn(0) , packetsIn(0)
, packetsOut(0) , packetsOut(0)
, localPort(0)
{ {
} }
@ -1245,17 +1248,20 @@ class Bond {
unsigned int suggestedRefractoryPeriod = refractoryPeriod ? punishment + (refractoryPeriod * 2) : punishment; unsigned int suggestedRefractoryPeriod = refractoryPeriod ? punishment + (refractoryPeriod * 2) : punishment;
refractoryPeriod = std::min(suggestedRefractoryPeriod, (unsigned int)ZT_BOND_MAX_REFRACTORY_PERIOD); refractoryPeriod = std::min(suggestedRefractoryPeriod, (unsigned int)ZT_BOND_MAX_REFRACTORY_PERIOD);
lastRefractoryUpdate = 0; lastRefractoryUpdate = 0;
} else { }
else {
uint32_t drainRefractory = 0; uint32_t drainRefractory = 0;
if (lastRefractoryUpdate) { if (lastRefractoryUpdate) {
drainRefractory = (now - lastRefractoryUpdate); drainRefractory = (now - lastRefractoryUpdate);
} else { }
else {
drainRefractory = (now - lastAliveToggle); drainRefractory = (now - lastAliveToggle);
} }
lastRefractoryUpdate = now; lastRefractoryUpdate = now;
if (refractoryPeriod > drainRefractory) { if (refractoryPeriod > drainRefractory) {
refractoryPeriod -= drainRefractory; refractoryPeriod -= drainRefractory;
} else { }
else {
refractoryPeriod = 0; refractoryPeriod = 0;
lastRefractoryUpdate = 0; lastRefractoryUpdate = 0;
} }
@ -1292,7 +1298,6 @@ class Bond {
*/ */
inline bool needsToSendQoS(int64_t now, uint64_t qosSendInterval) inline bool needsToSendQoS(int64_t now, uint64_t qosSendInterval)
{ {
// fprintf(stderr, "QOS table (%d / %d)\n", packetsReceivedSinceLastQoS, ZT_QOS_TABLE_SIZE);
return ((packetsReceivedSinceLastQoS >= ZT_QOS_TABLE_SIZE) || ((now - lastQoSMeasurement) > qosSendInterval)) && packetsReceivedSinceLastQoS; return ((packetsReceivedSinceLastQoS >= ZT_QOS_TABLE_SIZE) || ((now - lastQoSMeasurement) > qosSendInterval)) && packetsReceivedSinceLastQoS;
} }
@ -1364,6 +1369,8 @@ class Bond {
int packetsIn; int packetsIn;
int packetsOut; int packetsOut;
uint16_t localPort;
// AtomicCounter __refCount; // AtomicCounter __refCount;
SharedPtr<Path> p; SharedPtr<Path> p;

View File

@ -399,6 +399,11 @@
*/ */
#define ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE 64 #define ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE 64
/**
* Number of samples required before statistics summaries are computed
*/
#define ZT_QOS_SHORTTERM_SAMPLE_WIN_MIN_REQ_SIZE 4
/** /**
* Max allowable time spent in any queue (in ms) * Max allowable time spent in any queue (in ms)
*/ */

View File

@ -578,6 +578,7 @@ ZT_PeerList *Node::peers() const
if((*path)->valid()) { if((*path)->valid()) {
memcpy(&(p->paths[p->pathCount].address),&((*path)->address()),sizeof(struct sockaddr_storage)); memcpy(&(p->paths[p->pathCount].address),&((*path)->address()),sizeof(struct sockaddr_storage));
p->paths[p->pathCount].localSocket = (*path)->localSocket(); p->paths[p->pathCount].localSocket = (*path)->localSocket();
p->paths[p->pathCount].localPort = (*path)->localPort();
p->paths[p->pathCount].lastSend = (*path)->lastOut(); p->paths[p->pathCount].lastSend = (*path)->lastOut();
p->paths[p->pathCount].lastReceive = (*path)->lastIn(); p->paths[p->pathCount].lastReceive = (*path)->lastIn();
p->paths[p->pathCount].trustedPathId = RR->topology->getOutboundPathTrust((*path)->address()); p->paths[p->pathCount].trustedPathId = RR->topology->getOutboundPathTrust((*path)->address());

View File

@ -84,6 +84,7 @@ public:
_lastIn(0), _lastIn(0),
_lastTrustEstablishedPacketReceived(0), _lastTrustEstablishedPacketReceived(0),
_lastEchoRequestReceived(0), _lastEchoRequestReceived(0),
_localPort(0),
_localSocket(-1), _localSocket(-1),
_latencyMean(0.0), _latencyMean(0.0),
_latencyVariance(0.0), _latencyVariance(0.0),
@ -106,6 +107,7 @@ public:
_lastIn(0), _lastIn(0),
_lastTrustEstablishedPacketReceived(0), _lastTrustEstablishedPacketReceived(0),
_lastEchoRequestReceived(0), _lastEchoRequestReceived(0),
_localPort(0),
_localSocket(localSocket), _localSocket(localSocket),
_latencyMean(0.0), _latencyMean(0.0),
_latencyVariance(0.0), _latencyVariance(0.0),
@ -177,6 +179,11 @@ public:
*/ */
inline int64_t localSocket() const { return _localSocket; } inline int64_t localSocket() const { return _localSocket; }
/**
* @return Local port corresponding to the localSocket
*/
inline int64_t localPort() const { return _localPort; }
/** /**
* @return Physical address * @return Physical address
*/ */
@ -375,6 +382,7 @@ private:
int64_t _lastEchoRequestReceived; int64_t _lastEchoRequestReceived;
uint16_t _localPort;
int64_t _localSocket; int64_t _localSocket;
volatile float _latencyMean; volatile float _latencyMean;

27
one.cpp
View File

@ -591,7 +591,7 @@ static int cli(int argc,char **argv)
printf("200 setmtu OK" ZT_EOL_S); printf("200 setmtu OK" ZT_EOL_S);
return 0; return 0;
} else { } else {
printf("no link match found, new MTU was not applied" ZT_EOL_S); printf("%d Failed to set MTU: %s" ZT_EOL_S, scode, responseBody.c_str());
return 1; return 1;
} }
return 0; return 0;
@ -664,37 +664,37 @@ static int cli(int argc,char **argv)
printf("\nidx" printf("\nidx"
" interface" " interface"
" " " "
"path socket\n"); "path socket local port\n");
for(int i=0; i<100; i++) { printf("-"); } for(int i=0; i<120; i++) { printf("-"); }
printf("\n"); printf("\n");
for (int i=0; i<p.size(); i++) for (int i=0; i<p.size(); i++)
{ {
printf("%2d: %26s %51s %.16llx\n", printf("%2d: %26s %51s %.16llx %12d\n",
i, i,
OSUtils::jsonString(p[i]["ifname"],"-").c_str(), OSUtils::jsonString(p[i]["ifname"],"-").c_str(),
OSUtils::jsonString(p[i]["address"],"-").c_str(), OSUtils::jsonString(p[i]["address"],"-").c_str(),
(unsigned long long)OSUtils::jsonInt(p[i]["localSocket"],0) (unsigned long long)OSUtils::jsonInt(p[i]["localSocket"],0),
(uint16_t)OSUtils::jsonInt(p[i]["localPort"],0)
); );
} }
printf("\nidx lat pdv " printf("\nidx lat pdv "
"plr per capacity qual " "capacity qual "
"rx_age tx_age eligible bonded\n"); "rx_age tx_age eligible bonded flows\n");
for(int i=0; i<100; i++) { printf("-"); } for(int i=0; i<120; i++) { printf("-"); }
printf("\n"); printf("\n");
for (int i=0; i<p.size(); i++) for (int i=0; i<p.size(); i++)
{ {
printf("%2d: %8.2f %8.2f %7.4f %7.4f %10d %7.4f %11d %11d %9d %7d\n", printf("%2d: %8.2f %8.2f %10d %7.4f %11d %11d %9d %7d %7d\n",
i, i,
OSUtils::jsonDouble(p[i]["latencyMean"], 0), OSUtils::jsonDouble(p[i]["latencyMean"], 0),
OSUtils::jsonDouble(p[i]["latencyVariance"], 0), OSUtils::jsonDouble(p[i]["latencyVariance"], 0),
OSUtils::jsonDouble(p[i]["packetLossRatio"], 0),
OSUtils::jsonDouble(p[i]["packetErrorRatio"], 0),
(int)OSUtils::jsonInt(p[i]["givenLinkSpeed"], 0), (int)OSUtils::jsonInt(p[i]["givenLinkSpeed"], 0),
OSUtils::jsonDouble(p[i]["relativeQuality"], 0), OSUtils::jsonDouble(p[i]["relativeQuality"], 0),
(int)OSUtils::jsonInt(p[i]["lastInAge"], 0), (int)OSUtils::jsonInt(p[i]["lastInAge"], 0),
(int)OSUtils::jsonInt(p[i]["lastOutAge"], 0), (int)OSUtils::jsonInt(p[i]["lastOutAge"], 0),
(int)OSUtils::jsonInt(p[i]["eligible"],0), (int)OSUtils::jsonInt(p[i]["eligible"],0),
(int)OSUtils::jsonInt(p[i]["bonded"],0)); (int)OSUtils::jsonInt(p[i]["bonded"],0),
(int)OSUtils::jsonInt(p[i]["assignedFlowCount"],0));
} }
} }
} }
@ -706,6 +706,7 @@ static int cli(int argc,char **argv)
return 2; return 2;
} }
} }
/* zerotier-cli bond command was malformed in some way */ /* zerotier-cli bond command was malformed in some way */
printf("(bond) command is missing required arguments" ZT_EOL_S); printf("(bond) command is missing required arguments" ZT_EOL_S);
return 2; return 2;

View File

@ -146,6 +146,7 @@ private:
PhySocketType type; PhySocketType type;
ZT_PHY_SOCKFD_TYPE sock; ZT_PHY_SOCKFD_TYPE sock;
void *uptr; // user-settable pointer void *uptr; // user-settable pointer
uint16_t localPort;
ZT_PHY_SOCKADDR_STORAGE_TYPE saddr; // remote for TCP_OUT and TCP_IN, local for TCP_LISTEN, RAW, and UDP ZT_PHY_SOCKADDR_STORAGE_TYPE saddr; // remote for TCP_OUT and TCP_IN, local for TCP_LISTEN, RAW, and UDP
}; };
@ -244,6 +245,18 @@ public:
return &(reinterpret_cast<PhySocketImpl*>(s)->uptr); return &(reinterpret_cast<PhySocketImpl*>(s)->uptr);
} }
/**
* Return the local port corresponding to this PhySocket
*
* @param s Socket object
*
* @return Local port corresponding to this PhySocket
*/
static inline uint16_t getLocalPort(PhySocket* s) throw()
{
return reinterpret_cast<PhySocketImpl*>(s)->localPort;
}
/** /**
* Cause poll() to stop waiting immediately * Cause poll() to stop waiting immediately
* *
@ -417,6 +430,11 @@ public:
sws.type = ZT_PHY_SOCKET_UDP; sws.type = ZT_PHY_SOCKET_UDP;
sws.sock = s; sws.sock = s;
sws.uptr = uptr; sws.uptr = uptr;
#ifdef __UNIX_LIKE__
struct sockaddr_in *sin = (struct sockaddr_in *)localAddress;
sws.localPort = htons(sin->sin_port);
#endif
memset(&(sws.saddr),0,sizeof(struct sockaddr_storage)); memset(&(sws.saddr),0,sizeof(struct sockaddr_storage));
memcpy(&(sws.saddr),localAddress,(localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)); memcpy(&(sws.saddr),localAddress,(localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));

View File

@ -637,6 +637,7 @@ static void _peerToJson(nlohmann::json &pj,const ZT_Peer *peer, SharedPtr<Bond>
j["expired"] = (bool)(peer->paths[i].expired != 0); j["expired"] = (bool)(peer->paths[i].expired != 0);
j["preferred"] = (bool)(peer->paths[i].preferred != 0); j["preferred"] = (bool)(peer->paths[i].preferred != 0);
j["localSocket"] = peer->paths[i].localSocket; j["localSocket"] = peer->paths[i].localSocket;
j["localPort"] = peer->paths[i].localPort;
if (bond && peer->isBonded) { if (bond && peer->isBonded) {
uint64_t now = OSUtils::now(); uint64_t now = OSUtils::now();
j["ifname"] = std::string(peer->paths[i].ifname); j["ifname"] = std::string(peer->paths[i].ifname);
@ -976,7 +977,7 @@ public:
if (!OSUtils::writeFile(authTokenPath.c_str(),_authToken)) { if (!OSUtils::writeFile(authTokenPath.c_str(),_authToken)) {
Mutex::Lock _l(_termReason_m); Mutex::Lock _l(_termReason_m);
_termReason = ONE_UNRECOVERABLE_ERROR; _termReason = ONE_UNRECOVERABLE_ERROR;
_fatalErrorMessage = "authtoken.secret could not be written"; _fatalErrorMessage = "authtoken.secret could not be written (try running with -U to prevent dropping of privileges)";
return _termReason; return _termReason;
} else { } else {
OSUtils::lockDownFile(authTokenPath.c_str(),false); OSUtils::lockDownFile(authTokenPath.c_str(),false);
@ -996,7 +997,7 @@ public:
if (!OSUtils::writeFile(metricsTokenPath.c_str(),_metricsToken)) { if (!OSUtils::writeFile(metricsTokenPath.c_str(),_metricsToken)) {
Mutex::Lock _l(_termReason_m); Mutex::Lock _l(_termReason_m);
_termReason = ONE_UNRECOVERABLE_ERROR; _termReason = ONE_UNRECOVERABLE_ERROR;
_fatalErrorMessage = "metricstoken.secret could not be written"; _fatalErrorMessage = "metricstoken.secret could not be written (try running with -U to prevent dropping of privileges)";
return _termReason; return _termReason;
} else { } else {
OSUtils::lockDownFile(metricsTokenPath.c_str(),false); OSUtils::lockDownFile(metricsTokenPath.c_str(),false);
@ -1542,7 +1543,7 @@ public:
// control plane endpoints // control plane endpoints
std::string bondShowPath = "/bond/show/([0-9a-fA-F]{10})"; std::string bondShowPath = "/bond/show/([0-9a-fA-F]{10})";
std::string bondRotatePath = "/bond/rotate/([0-9a-fA-F]{10})"; std::string bondRotatePath = "/bond/rotate/([0-9a-fA-F]{10})";
std::string setBondMtuPath = "/bond/setmtu/([0-9]{3,5})/([a-zA-Z0-9_]{1,16})/([0-9a-fA-F\\.\\:]{1,39})"; std::string setBondMtuPath = "/bond/setmtu/([0-9]{1,6})/([a-zA-Z0-9_]{1,16})/([0-9a-fA-F\\.\\:]{1,39})";
std::string configPath = "/config"; std::string configPath = "/config";
std::string configPostPath = "/config/settings"; std::string configPostPath = "/config/settings";
std::string healthPath = "/health"; std::string healthPath = "/health";
@ -1662,6 +1663,7 @@ public:
ZT_PeerList *pl = _node->peers(); ZT_PeerList *pl = _node->peers();
if (pl) { if (pl) {
bool foundBond = false;
auto id = req.matches[1]; auto id = req.matches[1];
auto out = json::object(); auto out = json::object();
uint64_t wantp = Utils::hexStrToU64(id.str().c_str()); uint64_t wantp = Utils::hexStrToU64(id.str().c_str());
@ -1671,12 +1673,18 @@ public:
if (bond) { if (bond) {
_peerToJson(out,&(pl->peers[i]),bond,(_tcpFallbackTunnel != (TcpConnection *)0)); _peerToJson(out,&(pl->peers[i]),bond,(_tcpFallbackTunnel != (TcpConnection *)0));
setContent(req, res, out.dump()); setContent(req, res, out.dump());
foundBond = true;
} else { } else {
setContent(req, res, ""); setContent(req, res, "");
res.status = 400; res.status = 400;
} }
break;
} }
} }
if (!foundBond) {
setContent(req, res, "");
res.status = 400;
}
} }
_node->freeQueryResult((void *)pl); _node->freeQueryResult((void *)pl);
}; };
@ -1711,12 +1719,21 @@ public:
auto setMtu = [&, setContent](const httplib::Request &req, httplib::Response &res) { auto setMtu = [&, setContent](const httplib::Request &req, httplib::Response &res) {
if (!_node->bondController()->inUse()) { if (!_node->bondController()->inUse()) {
setContent(req, res, ""); setContent(req, res, "Bonding layer isn't active yet");
res.status = 400;
return;
}
uint32_t mtu = atoi(req.matches[1].str().c_str());
if (mtu < 68 || mtu > 65535) {
setContent(req, res, "Specified MTU is not reasonable");
res.status = 400; res.status = 400;
return; return;
} }
uint16_t mtu = atoi(req.matches[1].str().c_str());
res.status = _node->bondController()->setAllMtuByTuple(mtu, req.matches[2].str().c_str(), req.matches[3].str().c_str()) ? 200 : 400; res.status = _node->bondController()->setAllMtuByTuple(mtu, req.matches[2].str().c_str(), req.matches[3].str().c_str()) ? 200 : 400;
if (res.status == 400) {
setContent(req, res, "Unable to find specified link");
return;
}
setContent(req, res, "{}"); setContent(req, res, "{}");
}; };
_controlPlane.Post(setBondMtuPath, setMtu); _controlPlane.Post(setBondMtuPath, setMtu);
@ -2428,8 +2445,13 @@ public:
} }
// bondingPolicy cannot be used with allowTcpFallbackRelay // bondingPolicy cannot be used with allowTcpFallbackRelay
_allowTcpFallbackRelay = OSUtils::jsonBool(settings["allowTcpFallbackRelay"],true); bool _forceTcpRelayTmp = (OSUtils::jsonBool(settings["forceTcpRelay"],false));
_forceTcpRelay = OSUtils::jsonBool(settings["forceTcpRelay"],false); bool _bondInUse = _node->bondController()->inUse();
if (_forceTcpRelayTmp && _bondInUse) {
fprintf(stderr, "Warning: forceTcpRelay cannot be used with multipath. Disabling forceTcpRelay\n");
}
_allowTcpFallbackRelay = (OSUtils::jsonBool(settings["allowTcpFallbackRelay"],true) && !_node->bondController()->inUse());
_forceTcpRelay = (_forceTcpRelayTmp && !_node->bondController()->inUse());
#ifdef ZT_TCP_FALLBACK_RELAY #ifdef ZT_TCP_FALLBACK_RELAY
_fallbackRelayAddress = InetAddress(OSUtils::jsonString(settings["tcpFallbackRelay"], ZT_TCP_FALLBACK_RELAY).c_str()); _fallbackRelayAddress = InetAddress(OSUtils::jsonString(settings["tcpFallbackRelay"], ZT_TCP_FALLBACK_RELAY).c_str());